Index: core/src/main/java/org/apache/hama/pipes/Submitter.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/Submitter.java (revision 1558559) +++ core/src/main/java/org/apache/hama/pipes/Submitter.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; +import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; @@ -191,7 +192,7 @@ setIfUnset(job.getConfiguration(), "bsp.input.value.class", textClassname); setIfUnset(job.getConfiguration(), "bsp.output.key.class", textClassname); setIfUnset(job.getConfiguration(), "bsp.output.value.class", textClassname); - setIfUnset(job.getConfiguration(), "bsp.message.class", + setIfUnset(job.getConfiguration(), Constants.MESSAGE_CLASS, BytesWritable.class.getName()); setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job"); @@ -205,7 +206,7 @@ LOG.debug("InputFormat: " + job.getOutputFormat()); LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName()); LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName()); - LOG.debug("MessageClass: " + job.get("bsp.message.class")); + LOG.debug("MessageClass: " + job.get(Constants.MESSAGE_CLASS)); LOG.debug("bsp.master.address: " + job.getConfiguration().get("bsp.master.address")); Index: core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (revision 1558559) +++ core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (working copy) @@ -354,7 +354,7 @@ String peerName = Text.readString(this.inStream); M message = (M) ReflectionUtils.newInstance((Class) conf - .getClass("bsp.message.class", BytesWritable.class), conf); + .getClass(Constants.MESSAGE_CLASS, BytesWritable.class), conf); LOG.debug("Got MessageType.SEND_MSG peerName: " + peerName + " messageClass: " + message.getClass().getName()); Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1558559) +++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -356,6 +356,24 @@ } /** + * Get the message class. + * + * @return the message class. + */ + public Class getMessageClass() { + return conf.getClass(Constants.MESSAGE_CLASS, Text.class, Object.class); + } + + /** + * Set the message class. + * + * @param theClass the message class. + */ + public void setMessageClass(Class theClass) { + conf.setClass(Constants.MESSAGE_CLASS, theClass, Object.class); + } + + /** * Sets the output path for the job. * * @param path where the output gets written. Index: core/src/main/java/org/apache/hama/Constants.java =================================================================== --- core/src/main/java/org/apache/hama/Constants.java (revision 1558559) +++ core/src/main/java/org/apache/hama/Constants.java (working copy) @@ -101,7 +101,7 @@ public static final String JOB_PEERS_COUNT = "bsp.peers.num"; public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class"; public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class"; - public static final String MESSAGE_CLASS = "bsp.message.type.class"; + public static final String MESSAGE_CLASS = "bsp.message.class"; // ///////////////////////////////////////////// // Messaging related parameters. @@ -120,7 +120,7 @@ public static final String RUNTIME_PARTITIONING_CLASS = "bsp.input.partitioner.class"; public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks"; public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter"; - + public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record"; // ///////////////////////////////////// Index: core/src/test/java/org/apache/hama/pipes/TestPipes.java =================================================================== --- core/src/test/java/org/apache/hama/pipes/TestPipes.java (revision 1558559) +++ core/src/test/java/org/apache/hama/pipes/TestPipes.java (working copy) @@ -223,7 +223,7 @@ bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(NullWritable.class); bsp.setOutputValueClass(DoubleWritable.class); - bsp.set("bsp.message.class", DoubleWritable.class.getName()); + bsp.setMessageClass(DoubleWritable.class); return bsp; } @@ -233,7 +233,7 @@ bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(NullWritable.class); bsp.setOutputValueClass(DoubleWritable.class); - bsp.set("bsp.message.class", IntWritable.class.getName()); + bsp.setMessageClass(IntWritable.class); return bsp; } @@ -246,10 +246,9 @@ bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(IntWritable.class); bsp.setOutputValueClass(PipesVectorWritable.class); + bsp.setMessageClass(PipesKeyValueWritable.class); bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts"); - bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName()); - bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); bsp.setPartitioner(PipesPartitioner.class);