Index: core/src/main/java/org/apache/hama/pipes/PipesApplication.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesApplication.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/PipesApplication.java (working copy) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes; import java.io.BufferedReader; @@ -361,12 +360,12 @@ * * @throws IOException */ - public void cleanup() throws IOException { + public void cleanup(boolean sendClose) throws IOException { if (serverSocket != null) { serverSocket.close(); } try { - if (downlink != null) { + if ((downlink != null) && (sendClose)) { downlink.close(); } } catch (InterruptedException ie) { Index: core/src/main/java/org/apache/hama/pipes/PipesBSP.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesBSP.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/PipesBSP.java (working copy) @@ -93,7 +93,7 @@ } catch (InterruptedException e) { e.printStackTrace(); } finally { - this.application.cleanup(); + this.application.cleanup(true); } } Index: core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (working copy) @@ -31,12 +31,11 @@ /** * Dummy input format used when non-Java a {@link RecordReader} is used by the - * Pipes' application. + * Pipes application. * - * The only useful thing this does is set up the Map-Reduce job to get the - * {@link PipesDummyRecordReader}, everything else left for the 'actual' - * InputFormat specified by the user which is given by - * mapred.pipes.user.inputformat. + * The only useful thing this does is set up the BSP job to get the + * PipesDummyRecordReader, everything else left for the 'actual' InputFormat + * specified by the user which is given by hama.pipes.user.inputformat. * * Adapted from Hadoop Pipes. * Index: core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (working copy) @@ -43,6 +43,7 @@ * symbolic links for URIs specified with a fragment if * DistributedCache.getSymlinks() is true. * + * @param conf The job's configuration * @throws IOException If a DistributedCache file cannot be found. */ public static final void moveLocalFiles(Configuration conf) @@ -81,8 +82,8 @@ } if (files.length() > 0) { // I've replaced the use of the missing setLocalFiles and - // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils methods - // which set the cache configurations directly. + // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils + // methods which set the cache configurations directly. DistCacheUtils.addLocalFiles(conf, files.toString()); } } @@ -90,8 +91,8 @@ /** * Add the Files to HDFS * - * @param conf - * @param paths + * @param conf The job's configuration + * @param files Paths that should be transfered to HDFS */ public static String addFilesToHDFS(Configuration conf, String files) { if (files == null) @@ -139,8 +140,7 @@ /** * Add the JARs from the given HDFS paths to the Classpath * - * @param conf - * @param urls + * @param conf The job's configuration */ public static URL[] addJarsToJobClasspath(Configuration conf) { URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs(); Index: core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (working copy) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes.util; import java.io.FileWriter; @@ -36,7 +35,6 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; -//import org.apache.hama.util.GenericOptionsParser; public class SequenceFileDumper { @@ -63,28 +61,23 @@ Option option = OptionBuilder.withArgName(name).hasArgs(1) .withDescription(description).isRequired(required).create(); options.addOption(option); - } Parser createParser() { - Parser result = new BasicParser(); - return result; + return new BasicParser(); } void printUsage() { // The CLI package should do this for us, but I can't figure out how // to make it print something reasonable. System.out.println("hama seqdumper"); + System.out.println(" [-file ] // The sequence file to read"); System.out - .println(" [-seqFile ] // The Sequence File containing the Clusters"); + .println(" [-output ] // The output file. If not specified, dumps to the console"); System.out - .println(" [-output ] // The output file. If not specified, dumps to the console"); - System.out - .println(" [-substring // The number of chars of the FormatString() to print"); + .println(" [-substring // The number of chars of value to print"); System.out.println(" [-count ] // Report the count only"); - System.out.println(" [-help] // Print out help"); System.out.println(); - //GenericOptionsParser.printGenericCommandUsage(System.out); } } @@ -95,37 +88,23 @@ return; } - LOG.info("DEBUG: Hama SequenceFileDumper started!"); - - cli.addOption("seqFile", false, - "The Sequence File containing the Clusters", "path"); + // Add arguments + cli.addOption("file", false, "The Sequence File containing the Clusters", + "path"); cli.addOption("output", false, "The output file. If not specified, dumps to the console", "path"); - cli.addOption("substring", false, "The number of chars of the FormatString() to print", "number"); cli.addOption("count", false, "Report the count only", "number"); - cli.addOption("help", false, "Print out help", "class"); Parser parser = cli.createParser(); - try { HamaConfiguration conf = new HamaConfiguration(); - - //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args); - CommandLine cmdLine = parser.parse(cli.options, args); - // genericParser.getRemainingArgs()); - LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs()); - if (cmdLine.hasOption("help")) { - cli.printUsage(); - return; - } + if (cmdLine.hasOption("file")) { + Path path = new Path(cmdLine.getOptionValue("file")); - if (cmdLine.hasOption("seqFile")) { - Path path = new Path(cmdLine.getOptionValue("seqFile")); - FileSystem fs = FileSystem.get(path.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); @@ -135,6 +114,7 @@ } else { writer = new OutputStreamWriter(System.out); } + writer.append("Input Path: ").append(String.valueOf(path)) .append(LINE_SEP); @@ -143,10 +123,9 @@ sub = Integer.parseInt(cmdLine.getOptionValue("substring")); } - boolean countOnly = cmdLine.hasOption("count"); - Writable key = (Writable) reader.getKeyClass().newInstance(); Writable value = (Writable) reader.getValueClass().newInstance(); + writer.append("Key class: ") .append(String.valueOf(reader.getKeyClass())) .append(" Value Class: ").append(String.valueOf(value.getClass())) @@ -154,6 +133,7 @@ writer.flush(); long count = 0; + boolean countOnly = cmdLine.hasOption("count"); if (countOnly == false) { while (reader.next(key, value)) { writer.append("Key: ").append(String.valueOf(key)); @@ -166,7 +146,8 @@ } writer.append("Count: ").append(String.valueOf(count)) .append(LINE_SEP); - } else { + + } else { // count only while (reader.next(key, value)) { count++; } @@ -179,10 +160,13 @@ writer.close(); } reader.close(); + + } else { + cli.printUsage(); } } catch (ParseException e) { - LOG.info("Error : " + e); + LOG.error(e.getMessage()); cli.printUsage(); return; } Index: core/src/main/java/org/apache/hama/pipes/Submitter.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/Submitter.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/Submitter.java (working copy) @@ -214,7 +214,7 @@ * Submit a job to the cluster. All of the necessary modifications to the job * to run under pipes are made to the configuration. * - * @param conf the job to submit to the cluster (MODIFIED) + * @param job to submit to the cluster * @throws IOException */ public static void runJob(BSPJob job) throws IOException { Index: core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (working copy) @@ -49,7 +49,7 @@ public void cleanup() { try { - application.cleanup(); + application.cleanup(true); } catch (IOException e) { LOG.error(e); } Index: core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (working copy) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes.protocol; import java.io.BufferedInputStream; @@ -100,8 +99,9 @@ } int cmd = readCommand(); - if (cmd == -1) + if (cmd == -1) { continue; + } LOG.debug("Handling uplink command: " + MessageType.values()[cmd]); if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING @@ -114,8 +114,8 @@ } else if (cmd == MessageType.REGISTER_COUNTER.code && isPeerAvailable()) { // INCOMING /* - * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip - * transferring group and name each INCREMENT + * Is not used in Hama. Hadoop Pipes uses it - maybe for performance + * issue, skip transferring group and name each INCREMENT */ } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING synchronized (binProtocol.hasTaskLock) { @@ -149,8 +149,8 @@ } else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING reopenInput(); } else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING - LOG.debug("Got MessageType.CLEAR"); - peer.clear(); + clear(); + /* SequenceFileConnector Implementation */ } else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING seqFileOpen(); @@ -161,6 +161,7 @@ } else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING seqFileClose(); /* SequenceFileConnector Implementation */ + } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING partitionResponse(); } else { @@ -192,9 +193,24 @@ public void reopenInput() throws IOException { LOG.debug("Got MessageType.REOPEN_INPUT"); + peer.reopenInput(); + + WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.REOPEN_INPUT"); } + public void clear() throws IOException { + LOG.debug("Got MessageType.CLEAR"); + + peer.clear(); + + WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.CLEAR"); + } + public void getSuperstepCount() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.GET_SUPERSTEP_COUNT.code); @@ -257,7 +273,12 @@ public void sync() throws IOException, SyncException, InterruptedException { LOG.debug("Got MessageType.SYNC"); + peer.sync(); // this call blocks + + WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.SYNC"); } public void getMessage() throws IOException { @@ -282,10 +303,20 @@ } public void incrementCounter() throws IOException { + LOG.debug("Got MessageType.INCREMENT_COUNTER"); + String group = Text.readString(this.inStream); String name = Text.readString(this.inStream); long amount = WritableUtils.readVLong(this.inStream); + + LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + group + " name: " + + name + " amount: " + amount); + peer.incrementCounter(group, name, amount); + + WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.INCREMENT_COUNTER"); } @SuppressWarnings("unchecked") @@ -303,7 +334,11 @@ peer.send(peerName, message); - LOG.debug("Done MessageType.SEND_MSG to peerName: " + WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.SEND_MSG"); + + LOG.debug("Sent message to peerName: " + peerName + " messageClass: " + message.getClass().getName() @@ -362,13 +397,18 @@ Object.class), conf); LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: " - + keyOut.getClass().getName() + " valueOutClass: " + valueOut.getClass().getName()); + + keyOut.getClass().getName() + " valueOutClass: " + + valueOut.getClass().getName()); readObject((Writable) keyOut); readObject((Writable) valueOut); peer.write(keyOut, valueOut); + WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code); + binProtocol.flush(); + LOG.debug("Responded MessageType.WRITE_KEYVALUE"); + LOG.debug("Done MessageType.WRITE_KEYVALUE -" + " Key: " + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut @@ -385,6 +425,7 @@ // key and value class stored in the SequenceFile String keyClass = Text.readString(this.inStream); String valueClass = Text.readString(this.inStream); + LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + path); LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option); LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass); LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass); @@ -418,8 +459,10 @@ sequenceKeyWritable, sequenceValueWritable))); } catch (IOException e) { + LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; } catch (ClassNotFoundException e) { + LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; } @@ -451,10 +494,14 @@ sequenceKeyWritable, sequenceValueWritable))); } catch (IOException e) { + LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; } catch (ClassNotFoundException e) { + LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; } + } else { // wrong option + LOG.error("SEQFILE_OPEN - Wrong option: '" + option + "'"); } WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code); @@ -518,9 +565,9 @@ // check if fileID is available in sequenceFileWriter if (sequenceFileWriters.containsKey(fileID)) { - Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue() + Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue() .getKey(); - Writable sequenceValueWritable = sequenceFileReaders.get(fileID) + Writable sequenceValueWritable = sequenceFileWriters.get(fileID) .getValue().getValue(); // try to read key and value @@ -555,6 +602,7 @@ public void seqFileClose() throws IOException { int fileID = WritableUtils.readVInt(this.inStream); + LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + fileID); boolean result = false; @@ -594,11 +642,9 @@ */ protected void readObject(Writable obj) throws IOException { byte[] buffer; - // For BytesWritable and Text, use the specified length to set the length // this causes the "obvious" translations to work. So that if you emit // a string "abc" from C++, it shows up as "abc". - if (obj instanceof Text) { int numBytes = WritableUtils.readVInt(this.inStream); buffer = new byte[numBytes]; @@ -612,29 +658,22 @@ ((BytesWritable) obj).set(buffer, 0, numBytes); } else if (obj instanceof IntWritable) { - LOG.debug("read IntWritable"); ((IntWritable) obj).set(WritableUtils.readVInt(this.inStream)); } else if (obj instanceof LongWritable) { ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream)); - // else if ((obj instanceof FloatWritable) || (obj instanceof - // DoubleWritable)) - } else if (obj instanceof NullWritable) { throw new IOException("Cannot read data into NullWritable!"); } else { - // Note: other types are transfered as String which should be implemented - // in Writable itself try { - LOG.debug("reading other type"); + LOG.debug("reading type: " + obj.getClass().getName()); + // try reading object obj.readFields(this.inStream); - // String s = Text.readString(inStream); } catch (IOException e) { - throw new IOException("Hama Pipes is not able to read " + obj.getClass().getName(), e); } Index: core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (working copy) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes.protocol; import java.io.BufferedOutputStream; @@ -50,8 +49,7 @@ public class BinaryProtocol implements DownwardProtocol { - protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class - .getName()); + protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class); public static final int CURRENT_PROTOCOL_VERSION = 0; /** * The buffer size for the command socket @@ -74,7 +72,7 @@ * Upward messages are passed on the specified handler and downward downward * messages are public methods on this object. * - * @param jobConfig The job's configuration + * @param conf The job's configuration * @param out The output stream to communicate on. * @param in The input stream to communicate on. * @throws IOException @@ -362,8 +360,8 @@ * @throws IOException */ protected void writeObject(Writable obj) throws IOException { - // For basic types IntWritable, LongWritable, FloatWritable, DoubleWritable, - // Text and BytesWritable, encode them directly, so that they end up + // For basic types IntWritable, LongWritable, Text and BytesWritable, + // encode them directly, so that they end up // in C++ as the natural translations. if (obj instanceof Text) { Text t = (Text) obj; @@ -383,20 +381,8 @@ } else if (obj instanceof LongWritable) { WritableUtils.writeVLong(this.outStream, ((LongWritable) obj).get()); - // else if ((obj instanceof FloatWritable) || (obj instanceof - // DoubleWritable)) - } else { - // Note: other types are transfered as String which should be implemented - // in Writable itself - - // DataOutputBuffer buffer = new DataOutputBuffer(); - // buffer.reset(); - // obj.write(buffer); - // int length = buffer.getLength(); - // WritableUtils.writeVInt(stream, length); - // stream.write(buffer.getData(), 0, length); - + // Note: FloatWritable and DoubleWritable are written here obj.write(this.outStream); } } Index: core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (working copy) @@ -15,13 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes.protocol; /** * The integer codes to represent the different messages. These must match the * C++ codes or massive confusion will result. - * */ public enum MessageType { START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4), Index: core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (working copy) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.pipes.protocol; import java.io.IOException; @@ -42,6 +41,7 @@ /** * Set the BSP Job Configuration * + * @param conf The job's configuration * @throws IOException */ void setBSPJobConf(Configuration conf) throws IOException; @@ -58,6 +58,8 @@ /** * runSetup * + * @param pipedInput use pipedInput + * @param pipedOutput use pipedOutput * @throws IOException */ void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException; @@ -65,6 +67,8 @@ /** * runBsp * + * @param pipedInput use pipedInput + * @param pipedOutput use pipedOutput * @throws IOException */ void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException; @@ -72,6 +76,8 @@ /** * runCleanup * + * @param pipedInput use pipedInput + * @param pipedOutput use pipedOutput * @throws IOException */ void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException; @@ -79,6 +85,9 @@ /** * getPartition * + * @param key + * @param value + * @param numTasks number of available tasks * @throws IOException */ int getPartition(K1 key, V1 value, int numTasks) throws IOException; Index: core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (revision 1550486) +++ core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (working copy) @@ -45,8 +45,6 @@ * * @param input key. * @param input value. - * @param output key. - * @param output value. */ public class StreamingProtocol extends BinaryProtocol { Index: c++/src/main/native/pipes/impl/Pipes.cc =================================================================== --- c++/src/main/native/pipes/impl/Pipes.cc (revision 1550486) +++ c++/src/main/native/pipes/impl/Pipes.cc (working copy) @@ -28,9 +28,9 @@ HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream"); } - /* local function */ + /* local sendCommand function */ void sendCommand(int32_t cmd, bool flush) { - serializeInt(cmd, *out_stream_); + serialize(cmd, *out_stream_); if (flush) { out_stream_->flush(); } @@ -111,25 +111,25 @@ /* virtual void registerCounter(int id, const string& group, const string& name) { - serializeInt(REGISTER_COUNTER, *stream); - serializeInt(id, *stream); - serializeString(group, *stream); - serializeString(name, *stream); + serialize(REGISTER_COUNTER, *stream); + serialize(id, *stream); + serialize(group, *stream); + serialize(name, *stream); } virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) { - serializeInt(INCREMENT_COUNTER, *stream); - serializeInt(counter->getId(), *stream); - serializeLong(amount, *stream); + serialize(INCREMENT_COUNTER, *stream); + serialize(counter->getId(), *stream); + serialize(amount, *stream); } */ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { - serializeInt(INCREMENT_COUNTER, *out_stream_); - serializeString(group, *out_stream_); - serializeString(name, *out_stream_); - serializeLong(amount, *out_stream_); + serialize(INCREMENT_COUNTER, *out_stream_); + serialize(group, *out_stream_); + serialize(name, *out_stream_); + serialize(amount, *out_stream_); out_stream_->flush(); if(logging) { fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n"); @@ -173,7 +173,7 @@ cmd = deserializeInt(*in_stream_); switch (cmd) { - + case START_MESSAGE: { int32_t protocol_version; protocol_version = deserialize(*in_stream_); @@ -293,6 +293,17 @@ } /** + * Check for valid response command + */ + bool verifyResult(int32_t expected_response_cmd) { + int32_t response = deserialize(*in_stream_); + if (response != expected_response_cmd) { + return false; + } + return true; + } + + /** * Wait for next event, which should be a response for * a previously sent command (expected_response_cmd) * and return the generic result @@ -303,14 +314,13 @@ T result = T(); // read response command - int32_t cmd; - cmd = deserializeInt(*in_stream_); + int32_t cmd = deserialize(*in_stream_); // check if response is expected if (expected_response_cmd == cmd) { switch (cmd) { - + case GET_MSG_COUNT: { T msg_count; msg_count = deserialize(*in_stream_); @@ -362,7 +372,7 @@ } return superstep_count; } - + case SEQFILE_OPEN: { T file_id = deserialize(*in_stream_); if(logging) { @@ -421,8 +431,7 @@ vector results; // read response command - int32_t cmd; - cmd = deserializeInt(*in_stream_); + int32_t cmd = deserialize(*in_stream_); // check if response is expected if (expected_response_cmd == cmd) { @@ -467,14 +476,13 @@ KeyValuePair key_value_pair; // read response command - int32_t cmd; - cmd = deserializeInt(*in_stream_); + int32_t cmd = deserialize(*in_stream_); // check if response is expected or END_OF_DATA if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) { switch (cmd) { - + case READ_KEYVALUE: { K key = deserialize(*in_stream_); V value = deserialize(*in_stream_); @@ -727,20 +735,27 @@ /** * Register a counter with the given group and name. */ - /* - virtual Counter* getCounter(const std::string& group, - const std::string& name) { - int id = registeredCounterIds.size(); - registeredCounterIds.push_back(id); - uplink->registerCounter(id, group, name); - return new Counter(id); - }*/ + virtual long getCounter(const string& group, const string& name) { + // TODO + // int id = registeredCounterIds.size(); + // registeredCounterIds.push_back(id); + // uplink->registerCounter(id, group, name); + // return new Counter(id); + return 0; + } /** - * Increment the value of the counter with the given amount. + * Increments the counter identified by the group and counter name by the + * specified amount. */ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { uplink_->incrementCounter(group, name, amount); + + // Verify response command + bool response = protocol_->verifyResult(INCREMENT_COUNTER); + if (response == false) { + throw Error("incrementCounter received wrong response!"); + } } /********************************************/ @@ -775,6 +790,12 @@ */ virtual void sendMessage(const string& peer_name, const M& msg) { uplink_->sendCommand(SEND_MSG, peer_name, msg); + + // Verify response command + bool response = protocol_->verifyResult(SEND_MSG); + if (response == false) { + throw Error("sendMessage received wrong response!"); + } } /** @@ -815,6 +836,12 @@ */ virtual void sync() { uplink_->sendCommand(SYNC); + + // Verify response command + bool response = protocol_->verifyResult(SYNC); + if (response == false) { + throw Error("sync received wrong response!"); + } } /** @@ -911,6 +938,12 @@ */ virtual void clear() { uplink_->sendCommand(CLEAR); + + // Verify response command + bool response = protocol_->verifyResult(CLEAR); + if (response == false) { + throw Error("clear received wrong response!"); + } } /** @@ -918,10 +951,16 @@ */ virtual void write(const K2& key, const V2& value) { if (writer_ != NULL) { - writer_->emit(key, value); + writer_->emit(key, value); // TODO writer not implemented } else { uplink_->sendCommand(WRITE_KEYVALUE, key, value); } + + // Verify response command + bool response = protocol_->verifyResult(WRITE_KEYVALUE); + if (response == false) { + throw Error("write received wrong response!"); + } } /** @@ -952,6 +991,12 @@ */ virtual void reopenInput() { uplink_->sendCommand(REOPEN_INPUT); + + // Verify response command + bool response = protocol_->verifyResult(REOPEN_INPUT); + if (response == false) { + throw Error("reopenInput received wrong response!"); + } } Index: c++/src/main/native/pipes/api/hama/Pipes.hh =================================================================== --- c++/src/main/native/pipes/api/hama/Pipes.hh (revision 1550486) +++ c++/src/main/native/pipes/api/hama/Pipes.hh (working copy) @@ -238,6 +238,7 @@ * Register a counter with the given group and name. */ //virtual Counter* getCounter(const string& group, const string& name) = 0; + virtual long getCounter(const string& group, const string& name) = 0; /** * Increment the value of the counter with the given amount. @@ -329,6 +330,7 @@ } virtual void nextEvent() = 0; + virtual bool verifyResult(int32_t expected_response_cmd) = 0; virtual UpwardProtocol* getUplink() = 0; virtual ~Protocol(){} }; Index: c++/src/main/native/examples/README.txt =================================================================== --- c++/src/main/native/examples/README.txt (revision 1550486) +++ c++/src/main/native/examples/README.txt (working copy) @@ -37,7 +37,7 @@ View input and output data % hadoop fs -cat /examples/input/summation/input.txt -% hama seqdumper -seqFile /examples/output/summation/part-00000 +% hama seqdumper -file /examples/output/summation/part-00000 You should see # Input Path: /examples/output/summation/part-00000 @@ -68,7 +68,7 @@ View output data -% hama seqdumper -seqFile /examples/output/piestimator/part-00001 +% hama seqdumper -file /examples/output/piestimator/part-00001 You should see # Input Path: /examples/output/piestimator/part-00001 @@ -111,14 +111,14 @@ View input and output data % hama seqdumper \ - -seqFile /examples/input/matrixmultiplication/MatrixA.seq + -file /examples/input/matrixmultiplication/MatrixA.seq % hama seqdumper \ - -seqFile \ + -file \ /examples/input/matrixmultiplication/MatrixB_transposed.seq -% hama seqdumper -seqFile \ - /examples/output/matrixmultiplication/part-00001 +% hama seqdumper \ + -file /examples/output/matrixmultiplication/part-00001 Delete output folder Index: c++/pom.xml =================================================================== --- c++/pom.xml (revision 1550486) +++ c++/pom.xml (working copy) @@ -79,7 +79,7 @@ - true + true