### Eclipse Workspace Patch 1.0 #P hama-graph Index: src/main/java/org/apache/hama/graph/DiskVerticesInfo.java =================================================================== --- src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (revision 1449875) +++ src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (working copy) @@ -20,12 +20,13 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -44,10 +45,14 @@ private static final byte NULL = 0; private static final byte NOT_NULL = 1; - private RandomAccessFile staticGraphParts; - private RandomAccessFile softGraphParts; - private RandomAccessFile softGraphPartsNextIteration; + private FSDataOutputStream staticGraphPartsDos; + private FSDataInputStream staticGraphPartsDis; + private FSDataOutputStream softGraphPartsDos; + private FSDataInputStream softGraphPartsDis; + + private FSDataOutputStream softGraphPartsNextIterationDos; + private BitSet activeVertices; private long[] softValueOffsets; private long[] softValueOffsetsNextIteration; @@ -64,6 +69,7 @@ private int index = 0; private Configuration conf; private GraphJobRunner runner; + private String staticFile; @Override public void init(GraphJobRunner runner, Configuration conf, @@ -77,19 +83,19 @@ + "/"; LocalFileSystem local = FileSystem.getLocal(conf); local.mkdirs(new Path(rootPath)); - // make sure that those files do not exist - String staticFile = rootPath + "static.graph"; + staticFile = rootPath + "static.graph"; local.delete(new Path(staticFile), false); - staticGraphParts = new RandomAccessFile(staticFile, "rw"); + staticGraphPartsDos = local.create(new Path(staticFile)); String softGraphFileName = getSoftGraphFileName(rootPath, currentStep); local.delete(new Path(softGraphFileName), false); - softGraphParts = new RandomAccessFile(softGraphFileName, "rw"); + softGraphPartsDos = local.create(new Path(softGraphFileName)); } @Override public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException { - IOUtils.cleanup(null, softGraphParts, softGraphPartsNextIteration); + IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos, + staticGraphPartsDis, softGraphPartsDis); // delete the contents FileSystem.getLocal(conf).delete(new Path(rootPath), true); } @@ -101,15 +107,15 @@ "Additions are locked now, nobody is allowed to change the structure anymore."); // write the static parts - tmpStaticOffsets.add(staticGraphParts.length()); - vertex.getVertexID().write(staticGraphParts); - staticGraphParts.writeInt(vertex.getEdges() == null ? 0 : vertex.getEdges() - .size()); + tmpStaticOffsets.add(staticGraphPartsDos.getPos()); + vertex.getVertexID().write(staticGraphPartsDos); + staticGraphPartsDos.writeInt(vertex.getEdges() == null ? 0 : vertex + .getEdges().size()); for (Edge e : vertex.getEdges()) { - e.getDestinationVertexID().write(staticGraphParts); + e.getDestinationVertexID().write(staticGraphPartsDos); } - serializeSoft(vertex, -1, null, softGraphParts); + serializeSoft(vertex, -1, null, softGraphPartsDos); size++; } @@ -120,15 +126,15 @@ * the temporary storage. */ private void serializeSoft(Vertex vertex, int index, - long[] softValueOffsets, RandomAccessFile softGraphParts) + long[] softValueOffsets, FSDataOutputStream softGraphParts) throws IOException { // safe offset write the soft parts if (index >= 0) { - softValueOffsets[index] = softGraphParts.length(); + softValueOffsets[index] = softGraphParts.getPos(); // only set the bitset if we've finished the setup activeVertices.set(index, vertex.isHalted()); } else { - tmpSoftOffsets.add(softGraphParts.length()); + tmpSoftOffsets.add(softGraphParts.getPos()); } if (vertex.getValue() == null) { softGraphParts.write(NULL); @@ -158,7 +164,11 @@ tmpStaticOffsets = null; tmpSoftOffsets = null; - + try { + staticGraphPartsDos.close(); + } catch (IOException e) { + e.printStackTrace(); + } // prevent additional vertices from beeing added lockedAdditions = true; } @@ -180,8 +190,9 @@ public void startSuperstep() throws IOException { index = 0; String softGraphFileName = getSoftGraphFileName(rootPath, currentStep); - FileSystem.getLocal(conf).delete(new Path(softGraphFileName), true); - softGraphPartsNextIteration = new RandomAccessFile(softGraphFileName, "rw"); + LocalFileSystem local = FileSystem.getLocal(conf); + local.delete(new Path(softGraphFileName), true); + softGraphPartsNextIterationDos = local.create(new Path(softGraphFileName)); softValueOffsets = softValueOffsetsNextIteration; softValueOffsetsNextIteration = new long[softValueOffsetsNextIteration.length]; } @@ -191,17 +202,20 @@ throws IOException { // write to the soft parts serializeSoft(vertex, index++, softValueOffsetsNextIteration, - softGraphPartsNextIteration); + softGraphPartsNextIterationDos); } @Override public void finishSuperstep() throws IOException { // do not delete files in the first step + IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos, + softGraphPartsDis); if (currentStep > 0) { - softGraphParts.close(); - FileSystem.getLocal(conf).delete( - new Path(getSoftGraphFileName(rootPath, currentStep - 1)), true); - softGraphParts = softGraphPartsNextIteration; + LocalFileSystem local = FileSystem.getLocal(conf); + local.delete(new Path(getSoftGraphFileName(rootPath, currentStep - 1)), + true); + String softGraphFileName = getSoftGraphFileName(rootPath, currentStep); + softGraphPartsDis = local.open(new Path(softGraphFileName)); } currentStep++; } @@ -238,8 +252,15 @@ public IDSkippingIterator skippingIterator() { try { // reset - staticGraphParts.seek(0); - softGraphParts.seek(0); + String softGraphFileName = getSoftGraphFileName(rootPath, + Math.max(0, currentStep - 1)); + LocalFileSystem local = FileSystem.getLocal(conf); + // close the files + IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsDis, + staticGraphPartsDis, staticGraphPartsDos); + softGraphPartsDis = local.open(new Path(softGraphFileName)); + staticGraphPartsDis = local.open(new Path(staticFile)); + // ensure the vertex is not null if (cachedVertexInstance == null) { cachedVertexInstance = GraphJobRunner @@ -297,10 +318,10 @@ try { while (true) { // seek until we found something that satisfied our strategy - staticGraphParts.seek(staticOffsets[index]); + staticGraphPartsDis.seek(staticOffsets[index]); boolean halted = activeVertices.get(index); cachedVertexInstance.setVotedToHalt(halted); - cachedVertexInstance.getVertexID().readFields(staticGraphParts); + cachedVertexInstance.getVertexID().readFields(staticGraphPartsDis); if (strat.accept(cachedVertexInstance, messageVertexId)) { break; } @@ -308,20 +329,20 @@ return size; } } - softGraphParts.seek(softValueOffsets[index]); + softGraphPartsDis.seek(softValueOffsets[index]); // setting vertex value null here, because it may be overridden. Messaging // is not materializing the message directly- so it is possible for the // read fields method to change this object (thus a new object). cachedVertexInstance.setValue(null); - if (softGraphParts.readByte() == NOT_NULL) { + if (softGraphPartsDis.readByte() == NOT_NULL) { ensureVertexValueNotNull(); - cachedVertexInstance.getValue().readFields(softGraphParts); + cachedVertexInstance.getValue().readFields(softGraphPartsDis); } - cachedVertexInstance.readState(softGraphParts); - int numEdges = staticGraphParts.readInt(); - int softEdges = softGraphParts.readInt(); + cachedVertexInstance.readState(softGraphPartsDis); + int numEdges = staticGraphPartsDis.readInt(); + int softEdges = softGraphPartsDis.readInt(); if (softEdges != numEdges) { throw new IllegalArgumentException( "Number of edges seemed to change. This is not possible (yet)."); @@ -335,10 +356,10 @@ Edge edge = new Edge(); ensureEdgeValueNotNull(edge); ensureEdgeIDNotNull(edge); - edge.getDestinationVertexID().readFields(staticGraphParts); - if (softGraphParts.readByte() == NOT_NULL) { + edge.getDestinationVertexID().readFields(staticGraphPartsDis); + if (softGraphPartsDis.readByte() == NOT_NULL) { ensureEdgeValueNotNull(edge); - edge.getCost().readFields(softGraphParts); + edge.getCost().readFields(softGraphPartsDis); } else { edge.setCost(null); }