### Eclipse Workspace Patch 1.0 #P hama-trunk Index: examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (Arbeitskopie) @@ -24,8 +24,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,8 +41,6 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; -import org.apache.hama.examples.graph.partitioning.VertexPartitioner; public abstract class PageRankBase extends BSP { public static final Log LOG = LogFactory.getLog(PageRankBase.class); @@ -56,8 +52,6 @@ protected static double DAMPING_FACTOR = 0.85; protected static double EPSILON = 0.001; - private static final VertexPartitioner partitioner = new VertexPartitioner(); - static void mapAdjacencyList(Configuration conf, BSPPeer peer, HashMap> realAdjacencyList, HashMap tentativePagerank, @@ -84,17 +78,6 @@ reader.close(); } - static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf, - String[] groomNames) throws IOException, InstantiationException, - IllegalAccessException, InterruptedException { - - // set the partitioning vertex class - conf.setClass("hama.partitioning.vertex.class", Vertex.class, - PartitionableWritable.class); - - return (HamaConfiguration) partitioner.partition(conf, in, groomNames); - } - static HamaConfiguration partitionExample(Path out, HamaConfiguration conf, String[] groomNames) throws IOException, InstantiationException, IllegalAccessException, InterruptedException { @@ -136,25 +119,25 @@ } writer.close(); - - return partitionTextFile(input, conf, groomNames); + + return null; } - static void savePageRankMap(BSPPeer peer, Configuration conf, - Map tentativePagerank) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp" - + Path.SEPARATOR - + peer.getPeerName().split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]); - fs.delete(outPath, true); - final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, - outPath, Text.class, DoubleWritable.class); - for (Entry row : tentativePagerank.entrySet()) { - out.append(new Text(row.getKey().getName()), - new DoubleWritable(row.getValue())); - } - out.close(); - } +// static void savePageRankMap(BSPPeer peer, Configuration conf, +// Map tentativePagerank) throws IOException { +// FileSystem fs = FileSystem.get(conf); +// Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp" +// + Path.SEPARATOR +// + peer.getPeerName().split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]); +// fs.delete(outPath, true); +// final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, +// outPath, Text.class, DoubleWritable.class); +// for (Entry row : tentativePagerank.entrySet()) { +// out.append(new Text(row.getKey().getName()), +// new DoubleWritable(row.getValue())); +// } +// out.close(); +// } static void printOutput(FileSystem fs, Configuration conf) throws IOException { LOG.info("-------------------- RESULTS --------------------"); Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (Revision 1200271) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (Arbeitskopie) @@ -451,7 +451,7 @@ Object key = recordReader.createKey(); Object value = recordReader.createValue(); while (recordReader.next(key, value)) { - int index = partitioner.getPartition(key, value, numOfTasks); + int index = Math.abs(partitioner.getPartition(key, value, numOfTasks)); writers.get(index).append(key, value); } LOG.debug("Done with split " + i); Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (Arbeitskopie) @@ -18,57 +18,72 @@ package org.apache.hama.examples.graph; import java.io.IOException; -import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BooleanMessage; -import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.IntegerMessage; -import org.apache.hama.bsp.OutputCollector; -import org.apache.hama.bsp.RecordReader; -import org.apache.hama.examples.RandBench; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.util.KeyValuePair; import org.apache.zookeeper.KeeperException; -public class ShortestPaths extends ShortestPathsBase { +public class ShortestPaths extends + BSP { public static final Log LOG = LogFactory.getLog(ShortestPaths.class); - private final HashMap> adjacencyList = new HashMap>(); + public static final String SHORTEST_PATHS_START_VERTEX_NAME = "shortest.paths.start.vertex.name"; + + private final HashMap adjacencyList = new HashMap(); private final HashMap vertexLookupMap = new HashMap(); - private String[] peerNames; + private String masterTaskName; @Override - public void bsp(BSPPeer peer) + public void setup( + BSPPeer peer) throws IOException, KeeperException, InterruptedException { - // map our input into ram - mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, - vertexLookupMap); - // parse the configuration to get the peerNames - parsePeerNames(peer.getConfiguration()); - // get our master groom - String master = peer.getConfiguration().get(MASTER_TASK); + KeyValuePair next = null; + while ((next = peer.readNext()) != null) { + adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue() + .toArray()); + vertexLookupMap.put(next.getKey().getName(), next.getKey()); + } + + masterTaskName = peer.getPeerName(0); + // initial message bypass - ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get( - SHORTEST_PATHS_START_VERTEX_ID)); - if (v != null) { - v.setCost(0); - sendMessageToNeighbors(peer, v); + ShortestPathVertex startVertex = vertexLookupMap.get(peer + .getConfiguration().get(SHORTEST_PATHS_START_VERTEX_NAME)); + + if (startVertex != null) { + startVertex.setCost(0); + sendMessageToNeighbors(peer, startVertex); } + } + @Override + public void bsp( + BSPPeer peer) + throws IOException, KeeperException, InterruptedException { + boolean updated = true; while (updated) { int updatesMade = 0; @@ -86,23 +101,28 @@ } } // synchonize with all grooms if there were updates - updated = broadcastUpdatesMade(peer, master, updatesMade); + updated = broadcastUpdatesMade(peer, updatesMade); // send updates to the adjacents of the updated vertices for (ShortestPathVertex vertex : updatedQueue) { sendMessageToNeighbors(peer, vertex); } } - // finished, finally save our map to DFS. - saveVertexMap(peer.getConfiguration(), peer, adjacencyList); } - /** - * Parses the peer names to fix inconsistency in bsp peer names from context. - * - * @param conf - */ - private void parsePeerNames(Configuration conf) { - peerNames = conf.get(BSP_PEERS).split(";"); + @Override + public void cleanup( + BSPPeer peer) { + // write our map into hdfs + for (Entry entry : adjacencyList + .entrySet()) { + try { + peer.write(new Text(entry.getKey().getName()), new IntWritable(entry + .getKey().getCost())); + } catch (IOException e) { + e.printStackTrace(); + } + } + } /** @@ -119,11 +139,12 @@ * @throws KeeperException * @throws InterruptedException */ - private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates) - throws IOException, KeeperException, InterruptedException { - peer.send(master, new IntegerMessage(peer.getPeerName(), updates)); + private boolean broadcastUpdatesMade( + BSPPeer peer, + int updates) throws IOException, KeeperException, InterruptedException { + peer.send(masterTaskName, new IntegerMessage(peer.getPeerName(), updates)); peer.sync(); - if (peer.getPeerName().equals(master)) { + if (peer.getPeerName().equals(masterTaskName)) { int count = 0; IntegerMessage message; while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) { @@ -152,17 +173,51 @@ * @param id The vertex to all adjacent vertices the new cost has to be send. * @throws IOException */ - private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id) - throws IOException { - List outgoingEdges = adjacencyList.get(id); + private void sendMessageToNeighbors( + BSPPeer peer, + ShortestPathVertex id) throws IOException { + ShortestPathVertex[] outgoingEdges = adjacencyList.get(id); for (ShortestPathVertex adjacent : outgoingEdges) { - int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length)); - peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id - .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost() - + adjacent.getWeight())); + int mod = Math.abs((adjacent.hashCode() % peer.getAllPeerNames().length)); + peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(), + id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost() + + adjacent.getWeight())); } } + /** + * Just a reader of the vertexMap in DFS. Output going to STDOUT. + * + * @param fs + * @param conf + * @throws IOException + */ + protected final static void printOutput(FileSystem fs, Configuration conf) + throws IOException { + System.out.println("-------------------- RESULTS --------------------"); + FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir"))); + for (FileStatus status : stati) { + if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) { + Path path = status.getPath(); + SequenceFile.Reader reader; + try { + reader = new SequenceFile.Reader(fs, path, conf); + + Text key = new Text(); + IntWritable value = new IntWritable(); + while (reader.next(key, value)) { + if (value.get() != Integer.MAX_VALUE) { + System.out.println(key.toString() + " | " + value.get()); + } + } + reader.close(); + } catch (Exception e) { + LOG.debug(e); + } + } + } + } + public static void printUsage() { System.out.println("Single Source Shortest Path Example:"); System.out @@ -173,76 +228,52 @@ InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException { + // TODO rethink + printUsage(); - // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); - conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt"); - System.out.println("Setting default start vertex to \"Frankfurt\"!"); - conf.set(OUT_PATH, "sssp/output"); - Path adjacencyListPath = null; + BSPJob bsp = new BSPJob(conf); - if (args.length > 0) { - conf.set(SHORTEST_PATHS_START_VERTEX_ID, args[0]); - System.out.println("Setting start vertex to " + args[0] + "!"); + conf.set(SHORTEST_PATHS_START_VERTEX_NAME, "Klewno"); - if (args.length > 1) { - conf.set(OUT_PATH, args[1]); - System.out.println("Using new output folder: " + args[1]); - } + boolean outputEnabled = true; + // if (args.length > 0) { + // conf.set(SHORTEST_PATHS_START_VERTEX_NAME, args[0]); + // System.out.println("Setting start vertex to " + args[0] + "!"); + // + // if (args.length > 1) { + // System.out.println("Using new output folder: " + args[1]); + // } + // + // if (args.length > 2) { + // bsp.setInputPath(new Path(args[2])); + // outputEnabled = false; + // } + // } - if (args.length > 2) { - adjacencyListPath = new Path(args[2]); - } - - } - - Map> adjacencyList = null; - if (adjacencyListPath == null) - adjacencyList = ShortestPathsGraphLoader.loadGraph(); - - BSPJob bsp = new BSPJob(conf, RandBench.class); // Set the job name bsp.setJobName("Single Source Shortest Path"); bsp.setBspClass(ShortestPaths.class); + bsp.setPartitioner(HashPartitioner.class); + bsp.setNumBspTask(2); - // Set the task size as a number of GroomServer - BSPJobClient jobClient = new BSPJobClient(conf); - ClusterStatus cluster = jobClient.getClusterStatus(true); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputPath(new Path("/Users/thomas.jungblut/Downloads/in")); - Collection activeGrooms = cluster.getActiveGroomNames().keySet(); - String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputPath(new Path("/Users/thomas.jungblut/Downloads/out")); + bsp.setOutputKeyClass(Text.class); + bsp.setOutputValueClass(IntWritable.class); - LOG.info("Starting data partitioning..."); - if (adjacencyList == null) { - conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms); - } else { - conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms); - } - LOG.info("Finished!"); - - bsp.setNumBspTask(cluster.getGroomServers()); - long startTime = System.currentTimeMillis(); if (bsp.waitForCompletion(true)) { System.out.println("Job Finished in " + (double) (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); - printOutput(FileSystem.get(conf), conf); + if (outputEnabled) { + printOutput(FileSystem.get(conf), conf); + } } } - - @Override - public void cleanup(BSPPeer peer) { - // TODO Auto-generated method stub - - } - - @Override - public void setup(BSPPeer peer) throws IOException, KeeperException, - InterruptedException { - // TODO Auto-generated method stub - - } - } Index: examples/src/main/java/org/apache/hama/examples/graph/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (Arbeitskopie) @@ -28,7 +28,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPPeer; @@ -38,7 +42,7 @@ import org.apache.hama.bsp.RecordReader; import org.apache.zookeeper.KeeperException; -public class PageRank extends PageRankBase { +public class PageRank extends BSP { public static final Log LOG = LogFactory.getLog(PageRank.class); private final HashMap> adjacencyList = new HashMap>(); @@ -51,71 +55,71 @@ @Override public void setup(BSPPeer peer) { Configuration conf = peer.getConfiguration(); - numOfVertices = Integer.parseInt(conf.get("num.vertices")); - DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); - ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; - EPSILON = Double.parseDouble(conf.get("epsilon.error")); - MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); - peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";"); +// numOfVertices = Integer.parseInt(conf.get("num.vertices")); +// DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); +// ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; +// EPSILON = Double.parseDouble(conf.get("epsilon.error")); +// MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); +// peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";"); } @Override public void bsp(BSPPeer peer) throws IOException, KeeperException, InterruptedException { - String master = peer.getConfiguration().get(MASTER_TASK); - // setup the datasets - PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, - tentativePagerank, lookupMap); - - // while the error not converges against epsilon do the pagerank stuff - double error = 1.0; - int iteration = 0; - // if MAX_ITERATIONS are set to 0, ignore the iterations and just go - // with the error - while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS) - || error >= EPSILON) { - peer.sync(); - - if (iteration >= 1) { - // copy the old pagerank to the backup - copyTentativePageRankToBackup(); - // sum up all incoming messages for a vertex - HashMap sumMap = new HashMap(); - DoubleMessage msg = null; - while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) { - Vertex k = lookupMap.get(msg.getTag()); - if (!sumMap.containsKey(k)) { - sumMap.put(k, msg.getData()); - } else { - sumMap.put(k, msg.getData() + sumMap.get(k)); - } - } - // pregel formula: - // ALPHA = 0.15 / NumVertices() - // P(i) = ALPHA + 0.85 * sum - for (Entry entry : sumMap.entrySet()) { - tentativePagerank.put(entry.getKey(), ALPHA - + (entry.getValue() * DAMPING_FACTOR)); - } - - // determine the error and send this to the master - double err = determineError(); - error = broadcastError(peer, master, err); - } - // in every step send the tentative pagerank of a vertex to its - // adjacent vertices - for (Vertex vertex : adjacencyList.keySet()) { - sendMessageToNeighbors(peer, vertex); - } - - iteration++; - } - - // Clears all queues entries. - peer.clear(); - // finally save the chunk of pageranks - PageRankBase.savePageRankMap(peer, peer.getConfiguration(), - lastTentativePagerank); +// String master = peer.getConfiguration().get(MASTER_TASK); +// // setup the datasets +// PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, +// tentativePagerank, lookupMap); +// +// // while the error not converges against epsilon do the pagerank stuff +// double error = 1.0; +// int iteration = 0; +// // if MAX_ITERATIONS are set to 0, ignore the iterations and just go +// // with the error +// while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS) +// || error >= EPSILON) { +// peer.sync(); +// +// if (iteration >= 1) { +// // copy the old pagerank to the backup +// copyTentativePageRankToBackup(); +// // sum up all incoming messages for a vertex +// HashMap sumMap = new HashMap(); +// DoubleMessage msg = null; +// while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) { +// Vertex k = lookupMap.get(msg.getTag()); +// if (!sumMap.containsKey(k)) { +// sumMap.put(k, msg.getData()); +// } else { +// sumMap.put(k, msg.getData() + sumMap.get(k)); +// } +// } +// // pregel formula: +// // ALPHA = 0.15 / NumVertices() +// // P(i) = ALPHA + 0.85 * sum +// for (Entry entry : sumMap.entrySet()) { +// tentativePagerank.put(entry.getKey(), ALPHA +// + (entry.getValue() * DAMPING_FACTOR)); +// } +// +// // determine the error and send this to the master +// double err = determineError(); +// error = broadcastError(peer, master, err); +// } +// // in every step send the tentative pagerank of a vertex to its +// // adjacent vertices +// for (Vertex vertex : adjacencyList.keySet()) { +// sendMessageToNeighbors(peer, vertex); +// } +// +// iteration++; +// } +// +// // Clears all queues entries. +// peer.clear(); +// // finally save the chunk of pageranks +// PageRankBase.savePageRankMap(peer, peer.getConfiguration(), +// lastTentativePagerank); } private double broadcastError(BSPPeer peer, String master, double error) @@ -161,11 +165,11 @@ throws IOException { List outgoingEdges = adjacencyList.get(v); for (Vertex adjacent : outgoingEdges) { - int mod = Math.abs(adjacent.getId() % peerNames.length); +// int mod = Math.abs(adjacent.getId() % peerNames.length); // send a message of the tentative pagerank divided by the size of // the outgoing edges to all adjacents - peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(), - tentativePagerank.get(v) / outgoingEdges.size())); +// peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(), +// tentativePagerank.get(v) / outgoingEdges.size())); } } @@ -222,11 +226,11 @@ String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]); if (conf.get("in.path") == null) { - conf = PageRankBase.partitionExample(new Path(conf.get("out.path")), - conf, grooms); +// conf = PageRankBase.partitionExample(new Path(conf.get("out.path")), +// conf, grooms); } else { - conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")), - conf, grooms); +// conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")), +// conf, grooms); } BSPJob job = new BSPJob(conf); Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java (Arbeitskopie) @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hadoop.io.Writable; - -public interface PartitionableWritable extends Writable { - - public int getId(); - -} Index: core/src/main/java/org/apache/hama/bsp/FileInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (Revision 1200271) +++ core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (Arbeitskopie) @@ -43,532 +43,563 @@ import org.apache.hadoop.util.StringUtils; public abstract class FileInputFormat implements InputFormat { - public static final Log LOG = LogFactory.getLog(FileInputFormat.class); + public static final Log LOG = LogFactory.getLog(FileInputFormat.class); - private static final double SPLIT_SLOP = 1.1; // 10% slop + private static final double SPLIT_SLOP = 1.1; // 10% slop - private long minSplitSize = 1; - private static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); + private long minSplitSize = 1; + private static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + protected void setMinSplitSize(long minSplitSize) { + this.minSplitSize = minSplitSize; } - }; - protected void setMinSplitSize(long minSplitSize) { - this.minSplitSize = minSplitSize; - } + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List filters; - /** - * Proxy PathFilter that accepts a path only if all filters given in the - * constructor do. Used by the listPaths() to apply the built-in - * hiddenFileFilter together with a user provided one (if any). - */ - private static class MultiPathFilter implements PathFilter { - private List filters; + public MultiPathFilter(List filters) { + this.filters = filters; + } - public MultiPathFilter(List filters) { - this.filters = filters; + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } } - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; + /** + * @param fs + * the file system that the file is on + * @param filename + * the file name to check + * @return is this file splitable? + */ + protected boolean isSplitable(FileSystem fs, Path filename) { + return true; } - } - /** - * @param fs the file system that the file is on - * @param filename the file name to check - * @return is this file splitable? - */ - protected boolean isSplitable(FileSystem fs, Path filename) { - return true; - } + public abstract RecordReader getRecordReader(InputSplit split, + BSPJob job) throws IOException; - public abstract RecordReader getRecordReader(InputSplit split, - BSPJob job) throws IOException; + /** + * Set a PathFilter to be applied to the input paths for the map-reduce job. + * + * @param filter + * the PathFilter class use for filtering the input paths. + */ + public static void setInputPathFilter(BSPJob conf, + Class filter) { + conf.getConf().setClass("bsp.input.pathFilter.class", filter, + PathFilter.class); + } - /** - * Set a PathFilter to be applied to the input paths for the map-reduce job. - * - * @param filter the PathFilter class use for filtering the input paths. - */ - public static void setInputPathFilter(BSPJob conf, - Class filter) { - conf.getConf().setClass("bsp.input.pathFilter.class", filter, - PathFilter.class); - } + /** + * Get a PathFilter instance of the filter set for the input paths. + * + * @return the PathFilter instance set for the job, NULL if none has been + * set. + */ + public static PathFilter getInputPathFilter(BSPJob conf) { + Class filterClass = conf.getConf().getClass( + "bsp.input.pathFilter.class", null, PathFilter.class); + return (filterClass != null) ? ReflectionUtils.newInstance(filterClass, + conf.getConf()) : null; + } - /** - * Get a PathFilter instance of the filter set for the input paths. - * - * @return the PathFilter instance set for the job, NULL if none has been set. - */ - public static PathFilter getInputPathFilter(BSPJob conf) { - Class filterClass = conf.getConf().getClass( - "bsp.input.pathFilter.class", null, PathFilter.class); - return (filterClass != null) ? ReflectionUtils.newInstance(filterClass, - conf.getConf()) : null; - } + /** + * List input directories. Subclasses may override to, e.g., select only + * files matching a regular expression. + * + * @param job + * the job to list input paths for + * @return array of FileStatus objects + * @throws IOException + * if zero items. + */ + protected FileStatus[] listStatus(BSPJob job) throws IOException { + Path[] dirs = getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } - /** - * List input directories. Subclasses may override to, e.g., select only files - * matching a regular expression. - * - * @param job the job to list input paths for - * @return array of FileStatus objects - * @throws IOException if zero items. - */ - protected FileStatus[] listStatus(BSPJob job) throws IOException { - Path[] dirs = getInputPaths(job); - if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); - } + List result = new ArrayList(); + List errors = new ArrayList(); - List result = new ArrayList(); - List errors = new ArrayList(); + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List filters = new ArrayList(); + filters.add(hiddenFileFilter); + PathFilter jobFilter = getInputPathFilter(job); + if (jobFilter != null) { + filters.add(jobFilter); + } + PathFilter inputFilter = new MultiPathFilter(filters); - // creates a MultiPathFilter with the hiddenFileFilter and the - // user provided one (if any). - List filters = new ArrayList(); - filters.add(hiddenFileFilter); - PathFilter jobFilter = getInputPathFilter(job); - if (jobFilter != null) { - filters.add(jobFilter); - } - PathFilter inputFilter = new MultiPathFilter(filters); + for (Path p : dirs) { + FileSystem fs = p.getFileSystem(job.getConf()); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDir()) { + for (FileStatus stat : fs.listStatus( + globStat.getPath(), inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } - for (Path p : dirs) { - FileSystem fs = p.getFileSystem(job.getConf()); - FileStatus[] matches = fs.globStatus(p, inputFilter); - if (matches == null) { - errors.add(new IOException("Input path does not exist: " + p)); - } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + p + " matches 0 files")); - } else { - for (FileStatus globStat : matches) { - if (globStat.isDir()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), - inputFilter)) { - result.add(stat); - } - } else { - result.add(globStat); - } - } - } + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result.toArray(new FileStatus[result.size()]); } - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result.toArray(new FileStatus[result.size()]); - } + /** + * Splits files returned by {@link #listStatus(JobConf)} when they're too + * big. + */ + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + FileStatus[] files = listStatus(job); - /** - * Splits files returned by {@link #listStatus(JobConf)} when they're too big. - */ - public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { - FileStatus[] files = listStatus(job); + long totalSize = 0; // compute total size + for (FileStatus file : files) { // check we have valid files + if (file.isDir()) { + throw new IOException("Not a file: " + file.getPath()); + } + totalSize += file.getLen(); + } - long totalSize = 0; // compute total size - for (FileStatus file : files) { // check we have valid files - if (file.isDir()) { - throw new IOException("Not a file: " + file.getPath()); - } - totalSize += file.getLen(); - } + // take the short circuit path if we have already partitioned + ArrayList splits = new ArrayList(numSplits); + if (numSplits == files.length) { + for (FileStatus file : files) { + splits.add(new FileSplit(file.getPath(), 0, file.getLen(), + new String[0])); + } + return splits.toArray(new FileSplit[splits.size()]); + } - long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); - long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1), - minSplitSize); + long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); + long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1), + minSplitSize); - // generate splits - ArrayList splits = new ArrayList(numSplits); - NetworkTopology clusterMap = new NetworkTopology(); - for (FileStatus file : files) { - Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job.getConf()); - long length = file.getLen(); - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - if ((length != 0) && isSplitable(fs, path)) { - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); + // generate splits + NetworkTopology clusterMap = new NetworkTopology(); + for (FileStatus file : files) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConf()); + long length = file.getLen(); + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, + length); + if ((length != 0) && isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); - long bytesRemaining = length; - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, length - - bytesRemaining, splitSize, clusterMap); - splits.add(new FileSplit(path, length - bytesRemaining, splitSize, - splitHosts)); - bytesRemaining -= splitSize; - } + long bytesRemaining = length; + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, splitSize, clusterMap); + splits.add(new FileSplit(path, length - bytesRemaining, + splitSize, splitHosts)); + bytesRemaining -= splitSize; + } - if (bytesRemaining != 0) { - splits - .add(new FileSplit(path, length - bytesRemaining, bytesRemaining, - blkLocations[blkLocations.length - 1].getHosts())); - } - } else if (length != 0) { - String[] splitHosts = getSplitHosts(blkLocations, 0, length, clusterMap); - splits.add(new FileSplit(path, 0, length, splitHosts)); - } else { - // Create empty hosts array for zero length files - splits.add(new FileSplit(path, 0, length, new String[0])); - } + if (bytesRemaining != 0) { + splits.add(new FileSplit(path, length - bytesRemaining, + bytesRemaining, + blkLocations[blkLocations.length - 1].getHosts())); + } + } else if (length != 0) { + String[] splitHosts = getSplitHosts(blkLocations, 0, length, + clusterMap); + splits.add(new FileSplit(path, 0, length, splitHosts)); + } else { + // Create empty hosts array for zero length files + splits.add(new FileSplit(path, 0, length, new String[0])); + } + } + LOG.info("Total # of splits: " + splits.size()); + return splits.toArray(new FileSplit[splits.size()]); } - LOG.info("Total # of splits: " + splits.size()); - return splits.toArray(new FileSplit[splits.size()]); - } - protected long computeSplitSize(long goalSize, long minSize, long blockSize) { - return Math.max(minSize, Math.min(goalSize, blockSize)); - } + protected long computeSplitSize(long goalSize, long minSize, long blockSize) { + return Math.max(minSize, Math.min(goalSize, blockSize)); + } - protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) - && (offset < blkLocations[i].getOffset() - + blkLocations[i].getLength())) { - return i; - } + protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) + && (offset < blkLocations[i].getOffset() + + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + fileLength + ")"); } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset - + " is outside of file (0.." + fileLength + ")"); - } - /** - * Sets the given comma separated paths as the list of inputs for the - * map-reduce job. - * - * @param conf Configuration of the job - * @param commaSeparatedPaths Comma separated paths to be set as the list of - * inputs for the map-reduce job. - */ - public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) { - setInputPaths(conf, StringUtils - .stringToPath(getPathStrings(commaSeparatedPaths))); - } + /** + * Sets the given comma separated paths as the list of inputs for the + * map-reduce job. + * + * @param conf + * Configuration of the job + * @param commaSeparatedPaths + * Comma separated paths to be set as the list of inputs for the + * map-reduce job. + */ + public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) { + setInputPaths(conf, + StringUtils.stringToPath(getPathStrings(commaSeparatedPaths))); + } - /** - * Add the given comma separated paths to the list of inputs for the - * map-reduce job. - * - * @param conf The configuration of the job - * @param commaSeparatedPaths Comma separated paths to be added to the list of - * inputs for the map-reduce job. - */ - public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) { - for (String str : getPathStrings(commaSeparatedPaths)) { - addInputPath(conf, new Path(str)); + /** + * Add the given comma separated paths to the list of inputs for the + * map-reduce job. + * + * @param conf + * The configuration of the job + * @param commaSeparatedPaths + * Comma separated paths to be added to the list of inputs for + * the map-reduce job. + */ + public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) { + for (String str : getPathStrings(commaSeparatedPaths)) { + addInputPath(conf, new Path(str)); + } } - } - /** - * Set the array of {@link Path}s as the list of inputs for the map-reduce - * job. - * - * @param conf Configuration of the job. - * @param inputPaths the {@link Path}s of the input directories/files for the - * map-reduce job. - */ - public static void setInputPaths(BSPJob conf, Path... inputPaths) { - Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); - StringBuffer str = new StringBuffer(StringUtils.escapeString(path - .toString())); - for (int i = 1; i < inputPaths.length; i++) { - str.append(StringUtils.COMMA_STR); - path = new Path(conf.getWorkingDirectory(), inputPaths[i]); - str.append(StringUtils.escapeString(path.toString())); + /** + * Set the array of {@link Path}s as the list of inputs for the map-reduce + * job. + * + * @param conf + * Configuration of the job. + * @param inputPaths + * the {@link Path}s of the input directories/files for the + * map-reduce job. + */ + public static void setInputPaths(BSPJob conf, Path... inputPaths) { + Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); + StringBuffer str = new StringBuffer(StringUtils.escapeString(path + .toString())); + for (int i = 1; i < inputPaths.length; i++) { + str.append(StringUtils.COMMA_STR); + path = new Path(conf.getWorkingDirectory(), inputPaths[i]); + str.append(StringUtils.escapeString(path.toString())); + } + conf.set("bsp.input.dir", str.toString()); } - conf.set("bsp.input.dir", str.toString()); - } - /** - * Add a {@link Path} to the list of inputs for the map-reduce job. - * - * @param conf The configuration of the job - * @param path {@link Path} to be added to the list of inputs for the - * map-reduce job. - */ - public static void addInputPath(BSPJob conf, Path path) { - path = new Path(conf.getWorkingDirectory(), path); - String dirStr = StringUtils.escapeString(path.toString()); - String dirs = conf.get("bsp.input.dir"); - conf.set("bsp.input.dir", dirs == null ? dirStr : dirs - + StringUtils.COMMA_STR + dirStr); - } + /** + * Add a {@link Path} to the list of inputs for the map-reduce job. + * + * @param conf + * The configuration of the job + * @param path + * {@link Path} to be added to the list of inputs for the + * map-reduce job. + */ + public static void addInputPath(BSPJob conf, Path path) { + path = new Path(conf.getWorkingDirectory(), path); + String dirStr = StringUtils.escapeString(path.toString()); + String dirs = conf.get("bsp.input.dir"); + conf.set("bsp.input.dir", dirs == null ? dirStr : dirs + + StringUtils.COMMA_STR + dirStr); + } - // This method escapes commas in the glob pattern of the given paths. - private static String[] getPathStrings(String commaSeparatedPaths) { - int length = commaSeparatedPaths.length(); - int curlyOpen = 0; - int pathStart = 0; - boolean globPattern = false; - List pathStrings = new ArrayList(); + // This method escapes commas in the glob pattern of the given paths. + private static String[] getPathStrings(String commaSeparatedPaths) { + int length = commaSeparatedPaths.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - for (int i = 0; i < length; i++) { - char ch = commaSeparatedPaths.charAt(i); - switch (ch) { - case '{': { - curlyOpen++; - if (!globPattern) { - globPattern = true; - } - break; - } - case '}': { - curlyOpen--; - if (curlyOpen == 0 && globPattern) { - globPattern = false; - } - break; - } - case ',': { - if (!globPattern) { - pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); - pathStart = i + 1; - } - break; - } - } + for (int i = 0; i < length; i++) { + char ch = commaSeparatedPaths.charAt(i); + switch (ch) { + case '{': { + curlyOpen++; + if (!globPattern) { + globPattern = true; + } + break; + } + case '}': { + curlyOpen--; + if (curlyOpen == 0 && globPattern) { + globPattern = false; + } + break; + } + case ',': { + if (!globPattern) { + pathStrings + .add(commaSeparatedPaths.substring(pathStart, i)); + pathStart = i + 1; + } + break; + } + } + } + pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); + + return pathStrings.toArray(new String[0]); } - pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); - return pathStrings.toArray(new String[0]); - } - - /** - * Get the list of input {@link Path}s for the map-reduce job. - * - * @param conf The configuration of the job - * @return the list of input {@link Path}s for the map-reduce job. - */ - public static Path[] getInputPaths(BSPJob conf) { - String dirs = conf.getConf().get("bsp.input.dir", ""); - String[] list = StringUtils.split(dirs); - Path[] result = new Path[list.length]; - for (int i = 0; i < list.length; i++) { - result[i] = new Path(StringUtils.unEscapeString(list[i])); + /** + * Get the list of input {@link Path}s for the map-reduce job. + * + * @param conf + * The configuration of the job + * @return the list of input {@link Path}s for the map-reduce job. + */ + public static Path[] getInputPaths(BSPJob conf) { + String dirs = conf.getConf().get("bsp.input.dir", ""); + String[] list = StringUtils.split(dirs); + Path[] result = new Path[list.length]; + for (int i = 0; i < list.length; i++) { + result[i] = new Path(StringUtils.unEscapeString(list[i])); + } + return result; } - return result; - } - private void sortInDescendingOrder(List mylist) { - Collections.sort(mylist, new Comparator() { - public int compare(NodeInfo obj1, NodeInfo obj2) { + private void sortInDescendingOrder(List mylist) { + Collections.sort(mylist, new Comparator() { + public int compare(NodeInfo obj1, NodeInfo obj2) { - if (obj1 == null || obj2 == null) - return -1; + if (obj1 == null || obj2 == null) + return -1; - if (obj1.getValue() == obj2.getValue()) { - return 0; - } else { - return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); - } - } - }); - } + if (obj1.getValue() == obj2.getValue()) { + return 0; + } else { + return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); + } + } + }); + } - /** - * This function identifies and returns the hosts that contribute most for a - * given split. For calculating the contribution, rack locality is treated on - * par with host locality, so hosts from racks that contribute the most are - * preferred over hosts on racks that contribute less - * - * @param blkLocations The list of block locations - * @param offset - * @param splitSize - * @return array of hosts that contribute most to this split - * @throws IOException - */ - protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset, - long splitSize, NetworkTopology clusterMap) throws IOException { + /** + * This function identifies and returns the hosts that contribute most for a + * given split. For calculating the contribution, rack locality is treated + * on par with host locality, so hosts from racks that contribute the most + * are preferred over hosts on racks that contribute less + * + * @param blkLocations + * The list of block locations + * @param offset + * @param splitSize + * @return array of hosts that contribute most to this split + * @throws IOException + */ + protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset, + long splitSize, NetworkTopology clusterMap) throws IOException { - int startIndex = getBlockIndex(blkLocations, offset); + int startIndex = getBlockIndex(blkLocations, offset); - long bytesInThisBlock = blkLocations[startIndex].getOffset() - + blkLocations[startIndex].getLength() - offset; + long bytesInThisBlock = blkLocations[startIndex].getOffset() + + blkLocations[startIndex].getLength() - offset; - // If this is the only block, just return - if (bytesInThisBlock >= splitSize) { - return blkLocations[startIndex].getHosts(); - } + // If this is the only block, just return + if (bytesInThisBlock >= splitSize) { + return blkLocations[startIndex].getHosts(); + } - long bytesInFirstBlock = bytesInThisBlock; - int index = startIndex + 1; - splitSize -= bytesInThisBlock; + long bytesInFirstBlock = bytesInThisBlock; + int index = startIndex + 1; + splitSize -= bytesInThisBlock; - while (splitSize > 0) { - bytesInThisBlock = Math.min(splitSize, blkLocations[index++].getLength()); - splitSize -= bytesInThisBlock; - } + while (splitSize > 0) { + bytesInThisBlock = Math.min(splitSize, + blkLocations[index++].getLength()); + splitSize -= bytesInThisBlock; + } - long bytesInLastBlock = bytesInThisBlock; - int endIndex = index - 1; + long bytesInLastBlock = bytesInThisBlock; + int endIndex = index - 1; - Map hostsMap = new IdentityHashMap(); - Map racksMap = new IdentityHashMap(); - String[] allTopos = new String[0]; + Map hostsMap = new IdentityHashMap(); + Map racksMap = new IdentityHashMap(); + String[] allTopos = new String[0]; - // Build the hierarchy and aggregate the contribution of - // bytes at each level. See TestGetSplitHosts.java + // Build the hierarchy and aggregate the contribution of + // bytes at each level. See TestGetSplitHosts.java - for (index = startIndex; index <= endIndex; index++) { + for (index = startIndex; index <= endIndex; index++) { - // Establish the bytes in this block - if (index == startIndex) { - bytesInThisBlock = bytesInFirstBlock; - } else if (index == endIndex) { - bytesInThisBlock = bytesInLastBlock; - } else { - bytesInThisBlock = blkLocations[index].getLength(); - } + // Establish the bytes in this block + if (index == startIndex) { + bytesInThisBlock = bytesInFirstBlock; + } else if (index == endIndex) { + bytesInThisBlock = bytesInLastBlock; + } else { + bytesInThisBlock = blkLocations[index].getLength(); + } - allTopos = blkLocations[index].getTopologyPaths(); + allTopos = blkLocations[index].getTopologyPaths(); - // If no topology information is available, just - // prefix a fakeRack - if (allTopos.length == 0) { - allTopos = fakeRacks(blkLocations, index); - } + // If no topology information is available, just + // prefix a fakeRack + if (allTopos.length == 0) { + allTopos = fakeRacks(blkLocations, index); + } - // NOTE: This code currently works only for one level of - // hierarchy (rack/host). However, it is relatively easy - // to extend this to support aggregation at different - // levels + // NOTE: This code currently works only for one level of + // hierarchy (rack/host). However, it is relatively easy + // to extend this to support aggregation at different + // levels - for (String topo : allTopos) { + for (String topo : allTopos) { - Node node, parentNode; - NodeInfo nodeInfo, parentNodeInfo; + Node node, parentNode; + NodeInfo nodeInfo, parentNodeInfo; - node = clusterMap.getNode(topo); + node = clusterMap.getNode(topo); - if (node == null) { - node = new NodeBase(topo); - clusterMap.add(node); - } + if (node == null) { + node = new NodeBase(topo); + clusterMap.add(node); + } - nodeInfo = hostsMap.get(node); + nodeInfo = hostsMap.get(node); - if (nodeInfo == null) { - nodeInfo = new NodeInfo(node); - hostsMap.put(node, nodeInfo); - parentNode = node.getParent(); - parentNodeInfo = racksMap.get(parentNode); - if (parentNodeInfo == null) { - parentNodeInfo = new NodeInfo(parentNode); - racksMap.put(parentNode, parentNodeInfo); - } - parentNodeInfo.addLeaf(nodeInfo); - } else { - nodeInfo = hostsMap.get(node); - parentNode = node.getParent(); - parentNodeInfo = racksMap.get(parentNode); - } + if (nodeInfo == null) { + nodeInfo = new NodeInfo(node); + hostsMap.put(node, nodeInfo); + parentNode = node.getParent(); + parentNodeInfo = racksMap.get(parentNode); + if (parentNodeInfo == null) { + parentNodeInfo = new NodeInfo(parentNode); + racksMap.put(parentNode, parentNodeInfo); + } + parentNodeInfo.addLeaf(nodeInfo); + } else { + nodeInfo = hostsMap.get(node); + parentNode = node.getParent(); + parentNodeInfo = racksMap.get(parentNode); + } - nodeInfo.addValue(index, bytesInThisBlock); - parentNodeInfo.addValue(index, bytesInThisBlock); + nodeInfo.addValue(index, bytesInThisBlock); + parentNodeInfo.addValue(index, bytesInThisBlock); - } // for all topos + } // for all topos - } // for all indices + } // for all indices - return identifyHosts(allTopos.length, racksMap); - } + return identifyHosts(allTopos.length, racksMap); + } - private String[] identifyHosts(int replicationFactor, - Map racksMap) { + private String[] identifyHosts(int replicationFactor, + Map racksMap) { - String[] retVal = new String[replicationFactor]; + String[] retVal = new String[replicationFactor]; - List rackList = new LinkedList(); + List rackList = new LinkedList(); - rackList.addAll(racksMap.values()); + rackList.addAll(racksMap.values()); - // Sort the racks based on their contribution to this split - sortInDescendingOrder(rackList); + // Sort the racks based on their contribution to this split + sortInDescendingOrder(rackList); - boolean done = false; - int index = 0; + boolean done = false; + int index = 0; - // Get the host list for all our aggregated items, sort - // them and return the top entries - for (NodeInfo ni : rackList) { + // Get the host list for all our aggregated items, sort + // them and return the top entries + for (NodeInfo ni : rackList) { - Set hostSet = ni.getLeaves(); + Set hostSet = ni.getLeaves(); - List hostList = new LinkedList(); - hostList.addAll(hostSet); + List hostList = new LinkedList(); + hostList.addAll(hostSet); - // Sort the hosts in this rack based on their contribution - sortInDescendingOrder(hostList); + // Sort the hosts in this rack based on their contribution + sortInDescendingOrder(hostList); - for (NodeInfo host : hostList) { - // Strip out the port number from the host name - retVal[index++] = host.node.getName().split(":")[0]; - if (index == replicationFactor) { - done = true; - break; - } - } + for (NodeInfo host : hostList) { + // Strip out the port number from the host name + retVal[index++] = host.node.getName().split(":")[0]; + if (index == replicationFactor) { + done = true; + break; + } + } - if (done == true) { - break; - } + if (done == true) { + break; + } + } + return retVal; } - return retVal; - } - private String[] fakeRacks(BlockLocation[] blkLocations, int index) - throws IOException { - String[] allHosts = blkLocations[index].getHosts(); - String[] allTopos = new String[allHosts.length]; - for (int i = 0; i < allHosts.length; i++) { - allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; + private String[] fakeRacks(BlockLocation[] blkLocations, int index) + throws IOException { + String[] allHosts = blkLocations[index].getHosts(); + String[] allTopos = new String[allHosts.length]; + for (int i = 0; i < allHosts.length; i++) { + allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; + } + return allTopos; } - return allTopos; - } - private static class NodeInfo { - final Node node; - final Set blockIds; - final Set leaves; + private static class NodeInfo { + final Node node; + final Set blockIds; + final Set leaves; - private long value; + private long value; - NodeInfo(Node node) { - this.node = node; - blockIds = new HashSet(); - leaves = new HashSet(); - } + NodeInfo(Node node) { + this.node = node; + blockIds = new HashSet(); + leaves = new HashSet(); + } - long getValue() { - return value; - } + long getValue() { + return value; + } - void addValue(int blockIndex, long value) { - if (blockIds.add(blockIndex) == true) { - this.value += value; - } - } + void addValue(int blockIndex, long value) { + if (blockIds.add(blockIndex) == true) { + this.value += value; + } + } - Set getLeaves() { - return leaves; - } + Set getLeaves() { + return leaves; + } - void addLeaf(NodeInfo nodeInfo) { - leaves.add(nodeInfo); + void addLeaf(NodeInfo nodeInfo) { + leaves.add(nodeInfo); + } } - } } Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java (Arbeitskopie) @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hama.examples.graph.Vertex; - -public class VertexPartitioner extends AbstractGraphPartitioner { - - @Override - protected AdjacentPair process(String line) { - - String[] vertices = line.split("\t"); - - Vertex v = new Vertex(vertices[0]); - Vertex[] adjacents = new Vertex[vertices.length - 1]; - - for (int i = 1; i < vertices.length; i++) { - adjacents[i - 1] = new Vertex(vertices[i]); - } - - return new AdjacentPair(v, adjacents); - } - - -} Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java (Revision 0) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java (Revision 0) @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples.graph; + +import org.apache.hadoop.io.ArrayWritable; + +public class ShortestPathVertexArrayWritable extends ArrayWritable { + + public ShortestPathVertexArrayWritable() { + super(ShortestPathVertex.class); + } + +} Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (Arbeitskopie) @@ -40,8 +40,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; -import org.apache.hama.examples.graph.partitioning.ShortestPathVertexPartitioner; public abstract class ShortestPathsBase extends BSP { @@ -54,8 +52,6 @@ public static final String NAME_VALUE_SEPARATOR = ":"; public static final String MASTER_TASK = "master.groom"; - private static final ShortestPathVertexPartitioner partitioner = new ShortestPathVertexPartitioner(); - /** * When finished we just writing a sequencefile of the vertex name and the * cost. @@ -161,19 +157,7 @@ writer.close(); - return partition(conf, input, groomNames); + return null; } - protected final static Configuration partition(Configuration conf, - Path fileToPartition, String[] groomNames) throws IOException, - InstantiationException, IllegalAccessException, InterruptedException { - - // set the partitioning vertex class - conf.setClass("hama.partitioning.vertex.class", ShortestPathVertex.class, - PartitionableWritable.class); - - return partitioner.partition(conf, fileToPartition, groomNames); - - } - } Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java (Arbeitskopie) @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hama.examples.graph.ShortestPathVertex; - -public class ShortestPathVertexPartitioner extends - AbstractGraphPartitioner { - - @Override - protected AdjacentPair process(String line) { - - String[] vertices = line.split("\t"); - - ShortestPathVertex v = new ShortestPathVertex(0, vertices[0]); - - ShortestPathVertex[] adjacents = new ShortestPathVertex[vertices.length - 1]; - - for (int i = 1; i < vertices.length; i++) { - String[] vertexAndWeight = vertices[i].split(":"); - adjacents[i - 1] = new ShortestPathVertex( - Integer.valueOf(vertexAndWeight[1]), vertexAndWeight[0]); - } - - return new AdjacentPair(v, adjacents); - } - -} Index: examples/src/main/java/org/apache/hama/examples/graph/Vertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/Vertex.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/Vertex.java (Arbeitskopie) @@ -21,11 +21,10 @@ import java.io.DataOutput; import java.io.IOException; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; +import org.apache.hadoop.io.Writable; -public class Vertex implements PartitionableWritable { +public class Vertex implements Writable { - protected int id; protected String name; public Vertex() { @@ -35,24 +34,21 @@ public Vertex(String name) { super(); this.name = name; - this.id = name.hashCode(); } @Override public void readFields(DataInput in) throws IOException { - this.id = in.readInt(); this.name = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { - out.writeInt(id); out.writeUTF(name); } @Override public int hashCode() { - return id; + return name.hashCode(); } @Override @@ -69,11 +65,6 @@ return true; } - @Override - public int getId() { - return id; - } - public String getName() { return name; } Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java (Arbeitskopie) @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - - -public final class AdjacentPair { - - final K vertex; - final K[] adjacentVertices; - - - public AdjacentPair(K vertex, K[] adjacentVertices) { - super(); - this.vertex = vertex; - this.adjacentVertices = adjacentVertices; - } - - - -} Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (Arbeitskopie) @@ -24,7 +24,7 @@ public final class ShortestPathVertex extends Vertex { private int weight; - private Integer cost; + private int cost = Integer.MAX_VALUE; public ShortestPathVertex() { } @@ -34,7 +34,7 @@ this.weight = weight; } - public ShortestPathVertex(int weight, String name, Integer cost) { + public ShortestPathVertex(int weight, String name, int cost) { super(name); this.weight = weight; this.cost = cost; @@ -44,7 +44,7 @@ return name; } - public Integer getCost() { + public int getCost() { return cost; } @@ -52,10 +52,6 @@ this.cost = cost; } - public int getId() { - return id; - } - public int getWeight() { return weight; } Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java (Revision 1200271) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java (Arbeitskopie) @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hama.examples.graph.ShortestPaths; -import org.apache.hama.examples.graph.Vertex; - -/** - * This partitioner partitions the file data which should be in text form into a - * sequencefile. - * - * TODO: this should be extended with InputFormat stuff so we can parse every - * format. - * - */ -public abstract class AbstractGraphPartitioner { - - public static final Log LOG = LogFactory - .getLog(AbstractGraphPartitioner.class); - - private FileSystem fs; - - private Class vertexClass; - - @SuppressWarnings("unchecked") - public Configuration partition(Configuration conf, Path file, - String[] groomNames) throws InstantiationException, - IllegalAccessException, IOException, InterruptedException { - - fs = FileSystem.get(conf); - - vertexClass = (Class) conf.getClass("hama.partitioning.vertex.class", - Vertex.class); - - int sizeOfCluster = groomNames.length; - - // setup the paths where the grooms can find their input - List partPaths = new ArrayList(sizeOfCluster); - List writers = new ArrayList( - sizeOfCluster); - StringBuilder groomNameBuilder = new StringBuilder(); - // this loop adds partition paths for the writers and sets the appropriate - // groom names to the files and configuration - for (String entry : groomNames) { - partPaths.add(new Path(file.getParent().toString() + Path.SEPARATOR - + ShortestPaths.PARTED + Path.SEPARATOR - + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0])); - conf.set(ShortestPaths.MASTER_TASK, entry); - groomNameBuilder.append(entry + ";"); - } - // put every peer into the configuration - conf.set(ShortestPaths.BSP_PEERS, groomNameBuilder.toString()); - // create a seq writer for the files - for (Path p : partPaths) { - fs.delete(p, true); - writers.add(SequenceFile.createWriter(fs, conf, p, - ObjectWritable.class, ArrayWritable.class,CompressionType.NONE)); - } - - BufferedReader br = null; - try { - // read the input - br = new BufferedReader(new InputStreamReader(fs.open(file))); - - long numLines = 0L; - String line = null; - while ((line = br.readLine()) != null) { - // let the subclass process - AdjacentPair pair = process(line); - // check to which partition the vertex belongs - int mod = Math.abs(pair.vertex.getId() % sizeOfCluster); - writers.get(mod).append(new ObjectWritable(vertexClass, pair.vertex), - new ArrayWritable(vertexClass, pair.adjacentVertices)); - numLines++; - if (numLines % 100000 == 0) { - LOG.debug("Partitioned " + numLines + " of vertices!"); - } - } - - for (Path p : partPaths) { - conf.set("in.path." + p.getName(), p.toString()); - } - conf.set("num.vertices", "" + numLines); - LOG.debug("Partitioned a total of " + numLines + " vertices!"); - - return conf; - } finally { - // close our ressources - if (br != null) - br.close(); - - for (SequenceFile.Writer w : writers) - w.close(); - - fs.close(); - } - } - - protected abstract AdjacentPair process(String line); - -}