Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1416768) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -430,8 +430,6 @@ } else { fs.mkdirs(partitionedPath); } - // FIXME this is soo unsafe - RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); List writers = new ArrayList( splits.length); @@ -450,12 +448,10 @@ Path p = new Path(partitionedPath, getPartitionName(i)); if (codec == null) { writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), CompressionType.NONE)); + job.getInputKeyClass(), job.getInputValueClass(), CompressionType.NONE)); } else { writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), compressionType, codec)); + job.getInputKeyClass(), job.getInputValueClass(), compressionType, codec)); } } Index: core/src/main/java/org/apache/hama/bsp/FileInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (revision 1416768) +++ core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (working copy) @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.InvalidInputException; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; @@ -167,12 +166,13 @@ */ @Override public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { - long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); - long maxSize = getMaxSplitSize(job); - + FileStatus[] files = listStatus(job); + long totalSize = computeTotalSize(job, files); + long goalSize = computeGoalSize(numSplits, totalSize); + long minSize = Math.min(getFormatMinSplitSize(), getMinSplitSize(job)); + // generate splits List splits = new ArrayList(); - FileStatus[] files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); @@ -180,8 +180,8 @@ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(blockSize, minSize, maxSize); - + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + LOG.info(goalSize +", "+ minSize +", "+ blockSize); long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); @@ -232,7 +232,7 @@ * @param job the job to modify * @param size the minimum size */ - public static void setMinInputSplitSize(Job job, long size) { + public static void setMinInputSplitSize(BSPJob job, long size) { job.getConfiguration().setLong("bsp.min.split.size", size); } @@ -252,7 +252,7 @@ * @param job the job to modify * @param size the maximum split size */ - public static void setMaxInputSplitSize(Job job, long size) { + public static void setMaxInputSplitSize(BSPJob job, long size) { job.getConfiguration().setLong("bsp.max.split.size", size); } @@ -296,7 +296,7 @@ protected long computeGoalSize(int numSplits, long totalSize) { // The minus 1 is for the remainder. - return totalSize / (numSplits <= 1 ? 1 : numSplits - 1); + return totalSize / (numSplits <= 1 ? 1 : numSplits); } protected long computeSplitSize(long goalSize, long minSize, long blockSize) { Index: core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java (working copy) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Text; + +public class TextArrayWritable extends ArrayWritable { + + public TextArrayWritable() { + super(Text.class); + } + +} \ No newline at end of file Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1416768) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -28,8 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -273,53 +273,24 @@ private void loadVertices( BSPPeer peer) throws IOException, SyncException, InterruptedException { - - /* - * Several partitioning constants begin - */ - - final VertexInputReader reader = (VertexInputReader) ReflectionUtils - .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER, - VertexInputReader.class), conf); - final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); - final boolean runtimePartitioning = conf.getBoolean( - GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true); - - final long splitSize = peer.getSplitSize(); - final int partitioningSteps = partitionMultiSteps(peer, splitSize); - final long interval = splitSize / partitioningSteps; final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); - /* - * Several partitioning constants end - */ - LOG.debug("vertex class: " + vertexClass); Vertex vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - long startPos = peer.getPos(); - if (startPos == 0) - startPos = 1L; - KeyValuePair next = null; - int steps = 1; while ((next = peer.readNext()) != null) { - boolean vertexFinished = false; - try { - vertexFinished = reader.parseVertex(next.getKey(), next.getValue(), - vertex); - } catch (Exception e) { - // LOG.error("exception occured during parsing vertex!" + e.toString()); - throw new IOException("exception occured during parsing vertex!" - + e.toString()); + V key = (V) next.getKey(); + Writable[] edges = ((ArrayWritable) next.getValue()).get(); + vertex.setVertexID(key); + List> edgeList = new ArrayList>(); + for (Writable edge : edges) { + edgeList.add(new Edge((V) edge, null)); } - - if (!vertexFinished) { - continue; - } + vertex.setEdges(edgeList); if (vertex.getEdges() == null) { if (selfReference) { @@ -334,44 +305,12 @@ vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - if (runtimePartitioning) { - int partition = partitioner.getPartition(vertex.getVertexID(), - vertex.getValue(), peer.getNumPeers()); - peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex)); - } else { - vertex.setup(conf); - vertices.add(vertex); - } + vertex.setup(conf); + vertices.add(vertex); vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - - if (runtimePartitioning) { - if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval) { - peer.sync(); - steps++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - startPos = peer.getPos(); - } - } } - if (runtimePartitioning) { - peer.sync(); - - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - } LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); /* @@ -383,7 +322,7 @@ */ if (repairNeeded) { LOG.debug("Starting repair of this graph!"); - repair(peer, partitioningSteps, selfReference); + repair(peer, selfReference); } LOG.debug("Starting Vertex processing!"); @@ -392,83 +331,16 @@ @SuppressWarnings("unchecked") private void repair( BSPPeer peer, - int partitioningSteps, boolean selfReference) throws IOException, + boolean selfReference) throws IOException, SyncException, InterruptedException { - int multiSteps = 0; - MapWritable ssize = new MapWritable(); - ssize.put(new IntWritable(peer.getPeerIndex()), - new IntWritable(vertices.size())); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - int minVerticesSize = Integer.MAX_VALUE; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - int curr = ((IntWritable) e.getValue()).get(); - if (minVerticesSize > curr) { - minVerticesSize = curr; - } - } - } - - if (minVerticesSize < (partitioningSteps * 2)) { - multiSteps = minVerticesSize; - } else { - multiSteps = (partitioningSteps * 2); - } - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("steps"), new IntWritable(multiSteps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - Map> tmp = new HashMap>(); - int i = 0; - int syncs = 0; - for (Vertex v : vertices) { for (Edge e : v.getEdges()) { peer.send(v.getDestinationPeerName(e), new GraphJobMessage(e.getDestinationVertexID())); } - - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { - peer.sync(); - syncs++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - - Vertex newVertex = newVertexInstance(vertexClass, conf); - newVertex.setVertexID(vertexName); - newVertex.runner = this; - if (selfReference) { - newVertex.setEdges(Collections.singletonList(new Edge( - newVertex.getVertexID(), null))); - } else { - newVertex.setEdges(new ArrayList>(0)); - } - newVertex.setup(conf); - tmp.put(vertexName, newVertex); - - } - } - i++; } peer.sync(); @@ -488,7 +360,6 @@ newVertex.setup(conf); tmp.put(vertexName, newVertex); newVertex = null; - } for (Vertex e : vertices) { @@ -502,59 +373,6 @@ } /** - * Partitions our vertices through multiple supersteps to save memory. - */ - private int partitionMultiSteps( - BSPPeer peer, - long splitSize) throws IOException, SyncException, InterruptedException { - int multiSteps = 1; - - MapWritable ssize = new MapWritable(); - ssize - .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize)); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - long maxSplitSize = 0L; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - long curr = ((LongWritable) e.getValue()).get(); - if (maxSplitSize < curr) { - maxSplitSize = curr; - } - } - } - - int steps = (int) (maxSplitSize / conf.getLong( // 20 mb - "hama.graph.multi.step.partitioning.interval", 20000000)) + 1; - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("max"), new IntWritable(steps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - - if (isMasterTask(peer)) { - peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment( - multiSteps); - } - - return multiSteps; - } - - /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ Index: graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (revision 1416768) +++ graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (working copy) @@ -17,35 +17,33 @@ */ package org.apache.hama.graph; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.TestBSPMasterGroomServer; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]", - "yahoo.com\tnasa.gov\tstackoverflow.com]", - "twitter.com\tgoogle.com\tfacebook.com]", - "nasa.gov\tyahoo.com\tstackoverflow.com]", - "youtube.com\tgoogle.com\tyahoo.com]" }; + "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", + "yahoo.com\tnasa.gov\tstackoverflow.com", + "twitter.com\tgoogle.com\tfacebook.com", + "nasa.gov\tyahoo.com\tstackoverflow.com", + "youtube.com\tgoogle.com\tyahoo.com" }; private static String INPUT = "/tmp/pagerank-real-tmp.seq"; private static String OUTPUT = "/tmp/pagerank-real-out"; @@ -77,14 +75,15 @@ bsp.setAggregatorClass(AverageAggregator.class, PageRank.DanglingNodeAggregator.class); + bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(Text.class); + bsp.setInputValueClass(TextArrayWritable.class); + bsp.setVertexIDClass(Text.class); bsp.setVertexValueClass(DoubleWritable.class); bsp.setEdgeValueClass(NullWritable.class); - bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); - bsp.setInputFormat(TextInputFormat.class); - bsp.setInputKeyClass(LongWritable.class); - bsp.setInputValueClass(Text.class); bsp.setPartitioner(HashPartitioner.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(Text.class); @@ -123,26 +122,25 @@ } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), + new Path(INPUT), Text.class, TextArrayWritable.class); - File file = new File(INPUT); - LOG.info("Temp file length: " + file.length()); - - } catch (IOException e) { - e.printStackTrace(); + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (IOException e) { + e.printStackTrace(); } }