Index: core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (revision 1478258) +++ core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (working copy) @@ -146,8 +146,6 @@ Class keyClass = null; Class valueClass = null; - Class outputKeyClass = null; - Class outputValueClass = null; while ((pair = peer.readNext()) != null) { if (keyClass == null && valueClass == null) { keyClass = pair.getKey().getClass(); @@ -160,11 +158,6 @@ continue; } - if (outputKeyClass == null && outputValueClass == null) { - outputKeyClass = outputPair.getKey().getClass(); - outputValueClass = outputPair.getValue().getClass(); - } - int index = converter.getPartitionId(outputPair, partitioner, conf, peer, desiredNum); @@ -173,7 +166,7 @@ map = converter.newMap(); values.put(index, map); } - map.put(outputPair.getKey(), outputPair.getValue()); + map.put(pair.getKey(), pair.getValue()); } // The reason of use of Memory is to reduce file opens @@ -181,7 +174,7 @@ Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-" + peer.getPeerIndex()); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, - destFile, outputKeyClass, outputValueClass, CompressionType.NONE); + destFile, keyClass, valueClass, CompressionType.NONE); for (Map.Entry v : e.getValue().entrySet()) { writer.append(v.getKey(), v.getValue()); @@ -217,8 +210,7 @@ FileStatus[] files = fs.listStatus(statu.getPath()); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, - partitionFile, outputKeyClass, outputValueClass, - CompressionType.NONE); + partitionFile, keyClass, valueClass, CompressionType.NONE); for (int i = 0; i < files.length; i++) { LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir @@ -227,10 +219,9 @@ SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[i].getPath(), conf); - Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass, + Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf); + Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf); - Writable value = (Writable) ReflectionUtils.newInstance( - outputValueClass, conf); while (reader.next(key, value)) { writer.append(key, value); Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1478258) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -26,17 +26,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.Constants; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; +import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter; +import org.apache.hama.bsp.PartitioningRunner.RecordConverter; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.graph.IDSkippingIterator.Strategy; +import org.apache.hama.util.KeyValuePair; import org.apache.hama.util.ReflectionUtils; /** @@ -373,19 +376,27 @@ /** * Loads vertices into memory of each peer. */ + @SuppressWarnings("unchecked") private void loadVertices( BSPPeer peer) throws IOException, SyncException, InterruptedException { final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); - LOG.debug("Vertex class: " + vertexClass); + RecordConverter converter = org.apache.hadoop.util.ReflectionUtils + .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, + DefaultRecordConverter.class, RecordConverter.class), conf); // our VertexInputReader ensures incoming vertices are sorted by their ID Vertex vertex = GraphJobRunner . newVertexInstance(VERTEX_CLASS); - vertex.runner = this; - while (peer.readNext(vertex, NullWritable.get())) { + KeyValuePair record = null; + KeyValuePair converted = null; + while ((record = peer.readNext()) != null) { + converted = converter.convertRecord(record, conf); + vertex = (Vertex) converted.getKey(); + vertex.runner = this; vertex.setup(conf); + if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); }