Index: core/src/main/java/org/apache/hama/pipes/PipesApplication.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/PipesApplication.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/PipesApplication.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes;
import java.io.BufferedReader;
@@ -361,12 +360,12 @@
*
* @throws IOException
*/
- public void cleanup() throws IOException {
+ public void cleanup(boolean sendClose) throws IOException {
if (serverSocket != null) {
serverSocket.close();
}
try {
- if (downlink != null) {
+ if ((downlink != null) && (sendClose)) {
downlink.close();
}
} catch (InterruptedException ie) {
Index: core/src/main/java/org/apache/hama/pipes/PipesBSP.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/PipesBSP.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/PipesBSP.java (working copy)
@@ -93,7 +93,7 @@
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
- this.application.cleanup();
+ this.application.cleanup(true);
}
}
Index: core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (working copy)
@@ -31,12 +31,11 @@
/**
* Dummy input format used when non-Java a {@link RecordReader} is used by the
- * Pipes' application.
+ * Pipes application.
*
- * The only useful thing this does is set up the Map-Reduce job to get the
- * {@link PipesDummyRecordReader}, everything else left for the 'actual'
- * InputFormat specified by the user which is given by
- * mapred.pipes.user.inputformat.
+ * The only useful thing this does is set up the BSP job to get the
+ * PipesDummyRecordReader, everything else left for the 'actual' InputFormat
+ * specified by the user which is given by hama.pipes.user.inputformat.
*
* Adapted from Hadoop Pipes.
*
Index: core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (working copy)
@@ -43,6 +43,7 @@
* symbolic links for URIs specified with a fragment if
* DistributedCache.getSymlinks() is true.
*
+ * @param conf The job's configuration
* @throws IOException If a DistributedCache file cannot be found.
*/
public static final void moveLocalFiles(Configuration conf)
@@ -81,8 +82,8 @@
}
if (files.length() > 0) {
// I've replaced the use of the missing setLocalFiles and
- // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils methods
- // which set the cache configurations directly.
+ // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils
+ // methods which set the cache configurations directly.
DistCacheUtils.addLocalFiles(conf, files.toString());
}
}
@@ -90,8 +91,8 @@
/**
* Add the Files to HDFS
*
- * @param conf
- * @param paths
+ * @param conf The job's configuration
+ * @param files Paths that should be transfered to HDFS
*/
public static String addFilesToHDFS(Configuration conf, String files) {
if (files == null)
@@ -139,8 +140,7 @@
/**
* Add the JARs from the given HDFS paths to the Classpath
*
- * @param conf
- * @param urls
+ * @param conf The job's configuration
*/
public static URL[] addJarsToJobClasspath(Configuration conf) {
URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs();
Index: core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.util;
import java.io.FileWriter;
@@ -36,7 +35,6 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
-//import org.apache.hama.util.GenericOptionsParser;
public class SequenceFileDumper {
@@ -63,28 +61,23 @@
Option option = OptionBuilder.withArgName(name).hasArgs(1)
.withDescription(description).isRequired(required).create();
options.addOption(option);
-
}
Parser createParser() {
- Parser result = new BasicParser();
- return result;
+ return new BasicParser();
}
void printUsage() {
// The CLI package should do this for us, but I can't figure out how
// to make it print something reasonable.
System.out.println("hama seqdumper");
+ System.out.println(" [-file ] // The sequence file to read");
System.out
- .println(" [-seqFile ] // The Sequence File containing the Clusters");
+ .println(" [-output ] // The output file. If not specified, dumps to the console");
System.out
- .println(" [-output ] // The output file. If not specified, dumps to the console");
- System.out
- .println(" [-substring // The number of chars of the FormatString() to print");
+ .println(" [-substring // The number of chars of value to print");
System.out.println(" [-count ] // Report the count only");
- System.out.println(" [-help] // Print out help");
System.out.println();
- //GenericOptionsParser.printGenericCommandUsage(System.out);
}
}
@@ -95,37 +88,23 @@
return;
}
- LOG.info("DEBUG: Hama SequenceFileDumper started!");
-
- cli.addOption("seqFile", false,
- "The Sequence File containing the Clusters", "path");
+ // Add arguments
+ cli.addOption("file", false, "The Sequence File containing the Clusters",
+ "path");
cli.addOption("output", false,
"The output file. If not specified, dumps to the console", "path");
-
cli.addOption("substring", false,
"The number of chars of the FormatString() to print", "number");
cli.addOption("count", false, "Report the count only", "number");
- cli.addOption("help", false, "Print out help", "class");
Parser parser = cli.createParser();
-
try {
HamaConfiguration conf = new HamaConfiguration();
-
- //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args);
-
CommandLine cmdLine = parser.parse(cli.options, args);
- // genericParser.getRemainingArgs());
- LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs());
- if (cmdLine.hasOption("help")) {
- cli.printUsage();
- return;
- }
+ if (cmdLine.hasOption("file")) {
+ Path path = new Path(cmdLine.getOptionValue("file"));
- if (cmdLine.hasOption("seqFile")) {
- Path path = new Path(cmdLine.getOptionValue("seqFile"));
-
FileSystem fs = FileSystem.get(path.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
@@ -135,6 +114,7 @@
} else {
writer = new OutputStreamWriter(System.out);
}
+
writer.append("Input Path: ").append(String.valueOf(path))
.append(LINE_SEP);
@@ -143,10 +123,9 @@
sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
}
- boolean countOnly = cmdLine.hasOption("count");
-
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
+
writer.append("Key class: ")
.append(String.valueOf(reader.getKeyClass()))
.append(" Value Class: ").append(String.valueOf(value.getClass()))
@@ -154,6 +133,7 @@
writer.flush();
long count = 0;
+ boolean countOnly = cmdLine.hasOption("count");
if (countOnly == false) {
while (reader.next(key, value)) {
writer.append("Key: ").append(String.valueOf(key));
@@ -166,7 +146,8 @@
}
writer.append("Count: ").append(String.valueOf(count))
.append(LINE_SEP);
- } else {
+
+ } else { // count only
while (reader.next(key, value)) {
count++;
}
@@ -179,10 +160,13 @@
writer.close();
}
reader.close();
+
+ } else {
+ cli.printUsage();
}
} catch (ParseException e) {
- LOG.info("Error : " + e);
+ LOG.error(e.getMessage());
cli.printUsage();
return;
}
Index: core/src/main/java/org/apache/hama/pipes/Submitter.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/Submitter.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/Submitter.java (working copy)
@@ -214,7 +214,7 @@
* Submit a job to the cluster. All of the necessary modifications to the job
* to run under pipes are made to the configuration.
*
- * @param conf the job to submit to the cluster (MODIFIED)
+ * @param job to submit to the cluster
* @throws IOException
*/
public static void runJob(BSPJob job) throws IOException {
Index: core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (working copy)
@@ -49,7 +49,7 @@
public void cleanup() {
try {
- application.cleanup();
+ application.cleanup(true);
} catch (IOException e) {
LOG.error(e);
}
Index: core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.BufferedInputStream;
@@ -100,8 +99,9 @@
}
int cmd = readCommand();
- if (cmd == -1)
+ if (cmd == -1) {
continue;
+ }
LOG.debug("Handling uplink command: " + MessageType.values()[cmd]);
if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
@@ -114,8 +114,8 @@
} else if (cmd == MessageType.REGISTER_COUNTER.code
&& isPeerAvailable()) { // INCOMING
/*
- * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
- * transferring group and name each INCREMENT
+ * Is not used in Hama. Hadoop Pipes uses it - maybe for performance
+ * issue, skip transferring group and name each INCREMENT
*/
} else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
synchronized (binProtocol.hasTaskLock) {
@@ -149,8 +149,8 @@
} else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
reopenInput();
} else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
- LOG.debug("Got MessageType.CLEAR");
- peer.clear();
+ clear();
+
/* SequenceFileConnector Implementation */
} else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
seqFileOpen();
@@ -161,6 +161,7 @@
} else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
seqFileClose();
/* SequenceFileConnector Implementation */
+
} else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
partitionResponse();
} else {
@@ -192,9 +193,24 @@
public void reopenInput() throws IOException {
LOG.debug("Got MessageType.REOPEN_INPUT");
+
peer.reopenInput();
+
+ WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.REOPEN_INPUT");
}
+ public void clear() throws IOException {
+ LOG.debug("Got MessageType.CLEAR");
+
+ peer.clear();
+
+ WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.CLEAR");
+ }
+
public void getSuperstepCount() throws IOException {
WritableUtils.writeVInt(this.outStream,
MessageType.GET_SUPERSTEP_COUNT.code);
@@ -257,7 +273,12 @@
public void sync() throws IOException, SyncException, InterruptedException {
LOG.debug("Got MessageType.SYNC");
+
peer.sync(); // this call blocks
+
+ WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SYNC");
}
public void getMessage() throws IOException {
@@ -282,10 +303,20 @@
}
public void incrementCounter() throws IOException {
+ LOG.debug("Got MessageType.INCREMENT_COUNTER");
+
String group = Text.readString(this.inStream);
String name = Text.readString(this.inStream);
long amount = WritableUtils.readVLong(this.inStream);
+
+ LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + group + " name: "
+ + name + " amount: " + amount);
+
peer.incrementCounter(group, name, amount);
+
+ WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.INCREMENT_COUNTER");
}
@SuppressWarnings("unchecked")
@@ -303,7 +334,11 @@
peer.send(peerName, message);
- LOG.debug("Done MessageType.SEND_MSG to peerName: "
+ WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SEND_MSG");
+
+ LOG.debug("Sent message to peerName: "
+ peerName
+ " messageClass: "
+ message.getClass().getName()
@@ -362,13 +397,18 @@
Object.class), conf);
LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: "
- + keyOut.getClass().getName() + " valueOutClass: " + valueOut.getClass().getName());
+ + keyOut.getClass().getName() + " valueOutClass: "
+ + valueOut.getClass().getName());
readObject((Writable) keyOut);
readObject((Writable) valueOut);
peer.write(keyOut, valueOut);
+ WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.WRITE_KEYVALUE");
+
LOG.debug("Done MessageType.WRITE_KEYVALUE -"
+ " Key: "
+ ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
@@ -385,6 +425,7 @@
// key and value class stored in the SequenceFile
String keyClass = Text.readString(this.inStream);
String valueClass = Text.readString(this.inStream);
+ LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + path);
LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option);
LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass);
LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass);
@@ -418,8 +459,10 @@
sequenceKeyWritable, sequenceValueWritable)));
} catch (IOException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
@@ -451,10 +494,14 @@
sequenceKeyWritable, sequenceValueWritable)));
} catch (IOException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
+ } else { // wrong option
+ LOG.error("SEQFILE_OPEN - Wrong option: '" + option + "'");
}
WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
@@ -518,9 +565,9 @@
// check if fileID is available in sequenceFileWriter
if (sequenceFileWriters.containsKey(fileID)) {
- Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
+ Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue()
.getKey();
- Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
+ Writable sequenceValueWritable = sequenceFileWriters.get(fileID)
.getValue().getValue();
// try to read key and value
@@ -555,6 +602,7 @@
public void seqFileClose() throws IOException {
int fileID = WritableUtils.readVInt(this.inStream);
+ LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + fileID);
boolean result = false;
@@ -594,11 +642,9 @@
*/
protected void readObject(Writable obj) throws IOException {
byte[] buffer;
-
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
-
if (obj instanceof Text) {
int numBytes = WritableUtils.readVInt(this.inStream);
buffer = new byte[numBytes];
@@ -612,29 +658,22 @@
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof IntWritable) {
- LOG.debug("read IntWritable");
((IntWritable) obj).set(WritableUtils.readVInt(this.inStream));
} else if (obj instanceof LongWritable) {
((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
- // else if ((obj instanceof FloatWritable) || (obj instanceof
- // DoubleWritable))
-
} else if (obj instanceof NullWritable) {
throw new IOException("Cannot read data into NullWritable!");
} else {
- // Note: other types are transfered as String which should be implemented
- // in Writable itself
try {
- LOG.debug("reading other type");
+ LOG.debug("reading type: " + obj.getClass().getName());
+
// try reading object
obj.readFields(this.inStream);
- // String s = Text.readString(inStream);
} catch (IOException e) {
-
throw new IOException("Hama Pipes is not able to read "
+ obj.getClass().getName(), e);
}
Index: core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.BufferedOutputStream;
@@ -50,8 +49,7 @@
public class BinaryProtocol implements
DownwardProtocol {
- protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
- .getName());
+ protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class);
public static final int CURRENT_PROTOCOL_VERSION = 0;
/**
* The buffer size for the command socket
@@ -74,7 +72,7 @@
* Upward messages are passed on the specified handler and downward downward
* messages are public methods on this object.
*
- * @param jobConfig The job's configuration
+ * @param conf The job's configuration
* @param out The output stream to communicate on.
* @param in The input stream to communicate on.
* @throws IOException
@@ -362,8 +360,8 @@
* @throws IOException
*/
protected void writeObject(Writable obj) throws IOException {
- // For basic types IntWritable, LongWritable, FloatWritable, DoubleWritable,
- // Text and BytesWritable, encode them directly, so that they end up
+ // For basic types IntWritable, LongWritable, Text and BytesWritable,
+ // encode them directly, so that they end up
// in C++ as the natural translations.
if (obj instanceof Text) {
Text t = (Text) obj;
@@ -383,20 +381,8 @@
} else if (obj instanceof LongWritable) {
WritableUtils.writeVLong(this.outStream, ((LongWritable) obj).get());
- // else if ((obj instanceof FloatWritable) || (obj instanceof
- // DoubleWritable))
-
} else {
- // Note: other types are transfered as String which should be implemented
- // in Writable itself
-
- // DataOutputBuffer buffer = new DataOutputBuffer();
- // buffer.reset();
- // obj.write(buffer);
- // int length = buffer.getLength();
- // WritableUtils.writeVInt(stream, length);
- // stream.write(buffer.getData(), 0, length);
-
+ // Note: FloatWritable and DoubleWritable are written here
obj.write(this.outStream);
}
}
Index: core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (working copy)
@@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
/**
* The integer codes to represent the different messages. These must match the
* C++ codes or massive confusion will result.
- *
*/
public enum MessageType {
START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),
Index: core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.IOException;
@@ -42,6 +41,7 @@
/**
* Set the BSP Job Configuration
*
+ * @param conf The job's configuration
* @throws IOException
*/
void setBSPJobConf(Configuration conf) throws IOException;
@@ -58,6 +58,8 @@
/**
* runSetup
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -65,6 +67,8 @@
/**
* runBsp
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -72,6 +76,8 @@
/**
* runCleanup
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -79,6 +85,9 @@
/**
* getPartition
*
+ * @param key
+ * @param value
+ * @param numTasks number of available tasks
* @throws IOException
*/
int getPartition(K1 key, V1 value, int numTasks) throws IOException;
Index: core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
===================================================================
--- core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (revision 1550486)
+++ core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (working copy)
@@ -45,8 +45,6 @@
*
* @param input key.
* @param input value.
- * @param output key.
- * @param output value.
*/
public class StreamingProtocol
extends BinaryProtocol {
Index: c++/src/main/native/pipes/impl/Pipes.cc
===================================================================
--- c++/src/main/native/pipes/impl/Pipes.cc (revision 1550486)
+++ c++/src/main/native/pipes/impl/Pipes.cc (working copy)
@@ -28,9 +28,9 @@
HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
}
- /* local function */
+ /* local sendCommand function */
void sendCommand(int32_t cmd, bool flush) {
- serializeInt(cmd, *out_stream_);
+ serialize(cmd, *out_stream_);
if (flush) {
out_stream_->flush();
}
@@ -111,25 +111,25 @@
/*
virtual void registerCounter(int id, const string& group,
const string& name) {
- serializeInt(REGISTER_COUNTER, *stream);
- serializeInt(id, *stream);
- serializeString(group, *stream);
- serializeString(name, *stream);
+ serialize(REGISTER_COUNTER, *stream);
+ serialize(id, *stream);
+ serialize(group, *stream);
+ serialize(name, *stream);
}
virtual void incrementCounter(const TaskContext::Counter* counter,
uint64_t amount) {
- serializeInt(INCREMENT_COUNTER, *stream);
- serializeInt(counter->getId(), *stream);
- serializeLong(amount, *stream);
+ serialize(INCREMENT_COUNTER, *stream);
+ serialize(counter->getId(), *stream);
+ serialize(amount, *stream);
}
*/
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
- serializeInt(INCREMENT_COUNTER, *out_stream_);
- serializeString(group, *out_stream_);
- serializeString(name, *out_stream_);
- serializeLong(amount, *out_stream_);
+ serialize(INCREMENT_COUNTER, *out_stream_);
+ serialize(group, *out_stream_);
+ serialize(name, *out_stream_);
+ serialize(amount, *out_stream_);
out_stream_->flush();
if(logging) {
fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
@@ -173,7 +173,7 @@
cmd = deserializeInt(*in_stream_);
switch (cmd) {
-
+
case START_MESSAGE: {
int32_t protocol_version;
protocol_version = deserialize(*in_stream_);
@@ -293,6 +293,17 @@
}
/**
+ * Check for valid response command
+ */
+ bool verifyResult(int32_t expected_response_cmd) {
+ int32_t response = deserialize(*in_stream_);
+ if (response != expected_response_cmd) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Wait for next event, which should be a response for
* a previously sent command (expected_response_cmd)
* and return the generic result
@@ -303,14 +314,13 @@
T result = T();
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize(*in_stream_);
// check if response is expected
if (expected_response_cmd == cmd) {
switch (cmd) {
-
+
case GET_MSG_COUNT: {
T msg_count;
msg_count = deserialize(*in_stream_);
@@ -362,7 +372,7 @@
}
return superstep_count;
}
-
+
case SEQFILE_OPEN: {
T file_id = deserialize(*in_stream_);
if(logging) {
@@ -421,8 +431,7 @@
vector results;
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize(*in_stream_);
// check if response is expected
if (expected_response_cmd == cmd) {
@@ -467,14 +476,13 @@
KeyValuePair key_value_pair;
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize(*in_stream_);
// check if response is expected or END_OF_DATA
if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
switch (cmd) {
-
+
case READ_KEYVALUE: {
K key = deserialize(*in_stream_);
V value = deserialize(*in_stream_);
@@ -727,20 +735,27 @@
/**
* Register a counter with the given group and name.
*/
- /*
- virtual Counter* getCounter(const std::string& group,
- const std::string& name) {
- int id = registeredCounterIds.size();
- registeredCounterIds.push_back(id);
- uplink->registerCounter(id, group, name);
- return new Counter(id);
- }*/
+ virtual long getCounter(const string& group, const string& name) {
+ // TODO
+ // int id = registeredCounterIds.size();
+ // registeredCounterIds.push_back(id);
+ // uplink->registerCounter(id, group, name);
+ // return new Counter(id);
+ return 0;
+ }
/**
- * Increment the value of the counter with the given amount.
+ * Increments the counter identified by the group and counter name by the
+ * specified amount.
*/
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
uplink_->incrementCounter(group, name, amount);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(INCREMENT_COUNTER);
+ if (response == false) {
+ throw Error("incrementCounter received wrong response!");
+ }
}
/********************************************/
@@ -775,6 +790,12 @@
*/
virtual void sendMessage(const string& peer_name, const M& msg) {
uplink_->sendCommand(SEND_MSG, peer_name, msg);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(SEND_MSG);
+ if (response == false) {
+ throw Error("sendMessage received wrong response!");
+ }
}
/**
@@ -815,6 +836,12 @@
*/
virtual void sync() {
uplink_->sendCommand(SYNC);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(SYNC);
+ if (response == false) {
+ throw Error("sync received wrong response!");
+ }
}
/**
@@ -911,6 +938,12 @@
*/
virtual void clear() {
uplink_->sendCommand(CLEAR);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(CLEAR);
+ if (response == false) {
+ throw Error("clear received wrong response!");
+ }
}
/**
@@ -918,10 +951,16 @@
*/
virtual void write(const K2& key, const V2& value) {
if (writer_ != NULL) {
- writer_->emit(key, value);
+ writer_->emit(key, value); // TODO writer not implemented
} else {
uplink_->sendCommand(WRITE_KEYVALUE, key, value);
}
+
+ // Verify response command
+ bool response = protocol_->verifyResult(WRITE_KEYVALUE);
+ if (response == false) {
+ throw Error("write received wrong response!");
+ }
}
/**
@@ -952,6 +991,12 @@
*/
virtual void reopenInput() {
uplink_->sendCommand(REOPEN_INPUT);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(REOPEN_INPUT);
+ if (response == false) {
+ throw Error("reopenInput received wrong response!");
+ }
}
Index: c++/src/main/native/pipes/api/hama/Pipes.hh
===================================================================
--- c++/src/main/native/pipes/api/hama/Pipes.hh (revision 1550486)
+++ c++/src/main/native/pipes/api/hama/Pipes.hh (working copy)
@@ -238,6 +238,7 @@
* Register a counter with the given group and name.
*/
//virtual Counter* getCounter(const string& group, const string& name) = 0;
+ virtual long getCounter(const string& group, const string& name) = 0;
/**
* Increment the value of the counter with the given amount.
@@ -329,6 +330,7 @@
}
virtual void nextEvent() = 0;
+ virtual bool verifyResult(int32_t expected_response_cmd) = 0;
virtual UpwardProtocol* getUplink() = 0;
virtual ~Protocol(){}
};
Index: c++/src/main/native/examples/README.txt
===================================================================
--- c++/src/main/native/examples/README.txt (revision 1550486)
+++ c++/src/main/native/examples/README.txt (working copy)
@@ -37,7 +37,7 @@
View input and output data
% hadoop fs -cat /examples/input/summation/input.txt
-% hama seqdumper -seqFile /examples/output/summation/part-00000
+% hama seqdumper -file /examples/output/summation/part-00000
You should see
# Input Path: /examples/output/summation/part-00000
@@ -68,7 +68,7 @@
View output data
-% hama seqdumper -seqFile /examples/output/piestimator/part-00001
+% hama seqdumper -file /examples/output/piestimator/part-00001
You should see
# Input Path: /examples/output/piestimator/part-00001
@@ -111,14 +111,14 @@
View input and output data
% hama seqdumper \
- -seqFile /examples/input/matrixmultiplication/MatrixA.seq
+ -file /examples/input/matrixmultiplication/MatrixA.seq
% hama seqdumper \
- -seqFile \
+ -file \
/examples/input/matrixmultiplication/MatrixB_transposed.seq
-% hama seqdumper -seqFile \
- /examples/output/matrixmultiplication/part-00001
+% hama seqdumper \
+ -file /examples/output/matrixmultiplication/part-00001
Delete output folder
Index: c++/pom.xml
===================================================================
--- c++/pom.xml (revision 1550486)
+++ c++/pom.xml (working copy)
@@ -79,7 +79,7 @@
- true
+ true