Index: examples/src/main/java/org/apache/hama/examples/ExampleDriver.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (revision 1511156) +++ examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (working copy) @@ -36,6 +36,7 @@ pgd.addClass("pagerank", PageRank.class, "PageRank"); pgd.addClass("inlnkcount", InlinkCount.class, "InlinkCount"); pgd.addClass("bipartite", BipartiteMatching.class, "Bipartite Matching"); + pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering"); pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering"); pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent"); Index: examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java (working copy) @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +/** + * The SemiClusterDetails class is used to store a Semi-Cluster ID and its + * score.This class implements Comparable interface which compares the score of + * the objects. + * + */ + +public class SemiClusterDetails implements + WritableComparable { + + private String semiClusterId; + private double semiClusterScore; + + public SemiClusterDetails() { + this.semiClusterId = ""; + this.semiClusterScore = 1.0; + } + + public SemiClusterDetails(String semiClusterId, double semiClusterScore) { + this.semiClusterId = semiClusterId; + this.semiClusterScore = semiClusterScore; + } + + public String getSemiClusterId() { + return semiClusterId; + } + + public void setSemiClusterId(String semiClusterId) { + this.semiClusterId = semiClusterId; + } + + public double getSemiClusterScore() { + return semiClusterScore; + } + + public void setSemiClusterScore(double semiClusterScore) { + this.semiClusterScore = semiClusterScore; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); + long temp; + temp = Double.doubleToLongBits(semiClusterScore); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SemiClusterDetails other = (SemiClusterDetails) obj; + if (semiClusterId == null) { + if (other.semiClusterId != null) + return false; + } else if (!semiClusterId.equals(other.semiClusterId)) + return false; + if (Double.doubleToLongBits(semiClusterScore) != Double + .doubleToLongBits(other.semiClusterScore)) + return false; + return true; + } + + @Override + public String toString() { + return semiClusterId; + } + + @Override + public void readFields(DataInput in) throws IOException { + String semiClusterId = in.readUTF(); + setSemiClusterId(semiClusterId); + double score = in.readDouble(); + setSemiClusterScore(score); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(semiClusterId); + out.writeDouble(semiClusterScore); + } + + @Override + public int compareTo(SemiClusterDetails sc) { + return (this.getSemiClusterId().compareTo(sc.getSemiClusterId())); + } +} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (working copy) @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextOutputFormat; +import org.apache.hama.graph.GraphJob; + +public class SemiClusterJobDriver { + + protected static final Log LOG = LogFactory + .getLog(SemiClusterJobDriver.class); + private static final String outputPathString = "semicluster.outputpath"; + private static final String inputPathString = "semicluster.inputmatrixpath"; + private static final String requestedGraphJobMaxIterationString = "hama.graph.max.iteration"; + private static final String semiClusterMaximumVertexCount = "semicluster.max.vertex.count"; + private static final String graphJobMessageSentCount = "semicluster.max.message.sent.count"; + private static final String graphJobVertexMaxClusterCount = "vertex.max.cluster.count"; + + public static void startTask(HamaConfiguration conf) throws IOException, + InterruptedException, ClassNotFoundException { + GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class); + semiClusterJob + .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class); + semiClusterJob.setJobName("SemiClusterJob"); + semiClusterJob.setVertexClass(SemiClusteringVertex.class); + semiClusterJob.setInputPath(new Path(conf.get(inputPathString))); + semiClusterJob.setOutputPath(new Path(conf.get(outputPathString))); + + semiClusterJob.set("hama.graph.self.ref", "true"); + semiClusterJob.set("hama.graph.repair", "true"); + + semiClusterJob.setVertexIDClass(Text.class); + semiClusterJob.setVertexValueClass(SemiClusterMessage.class); + semiClusterJob.setEdgeValueClass(DoubleWritable.class); + + semiClusterJob.setInputKeyClass(LongWritable.class); + semiClusterJob.setInputValueClass(Text.class); + semiClusterJob.setInputFormat(TextInputFormat.class); + semiClusterJob.setVertexInputReaderClass(SemiClusterTextReader.class); + + semiClusterJob.setPartitioner(HashPartitioner.class); + + semiClusterJob.setOutputFormat(TextOutputFormat.class); + semiClusterJob.setOutputKeyClass(Text.class); + semiClusterJob.setOutputValueClass(Text.class); + + long startTime = System.currentTimeMillis(); + if (semiClusterJob.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + } + + private static void printUsage() { + LOG.info("Usage: SemiClusterO [number of tasks (default max)] [Maximum number of vertices in a Semi Cluster (default 10)] [Number of messages sent from a Vertex(default 10)][Maximum number of clusters in which a vertex can be containted(default 10)]"); + } + + /** + * Function parses command line in standart form. + */ + private static void parseArgs(HamaConfiguration conf, String[] args) { + if (args.length < 2) { + printUsage(); + System.exit(-1); + } + + conf.set(inputPathString, args[0]); + + Path path = new Path(args[1]); + conf.set(outputPathString, path.toString()); + + if (args.length >= 3) { + try { + int taskCount = Integer.parseInt(args[2]); + if (taskCount < 0) { + printUsage(); + throw new IllegalArgumentException( + "The number of requested job maximum iteration count can't be negative. Actual value: " + + String.valueOf(taskCount)); + } + conf.setInt(requestedGraphJobMaxIterationString, taskCount); + if (args.length >= 4) { + int maximumVertexCount = Integer.parseInt(args[3]); + if (maximumVertexCount < 0) { + printUsage(); + throw new IllegalArgumentException( + "The number of maximum vertex count can't be negative. Actual value: " + + String.valueOf(maximumVertexCount)); + } + conf.setInt(semiClusterMaximumVertexCount, maximumVertexCount); + if (args.length >= 5) { + int messageSentCount = Integer.parseInt(args[4]); + if (messageSentCount < 0) { + printUsage(); + throw new IllegalArgumentException( + "The number of maximum message sent count can't be negative. Actual value: " + + String.valueOf(messageSentCount)); + } + conf.setInt(graphJobMessageSentCount, messageSentCount); + if (args.length == 6) { + int vertexClusterCount = Integer.parseInt(args[5]); + if (vertexClusterCount < 0) { + printUsage(); + throw new IllegalArgumentException( + "The maximum number of clusters in which a vertex can be containted can't be negative. Actual value: " + + String.valueOf(vertexClusterCount)); + } + conf.setInt(graphJobVertexMaxClusterCount, vertexClusterCount); + + } + } + } + } catch (NumberFormatException e) { + printUsage(); + throw new IllegalArgumentException( + "The format of job maximum iteration count is int. Can not parse value: " + + args[2]); + } + } + } + + public static void main(String[] args) throws Exception { + HamaConfiguration conf = new HamaConfiguration(); + parseArgs(conf, args); + startTask(conf); + } +} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java (working copy) @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.graph.Vertex; + +/** + * The SemiClusterMessage class defines the structure of the value stored by + * each vertex in the graph Job which is same as the Message sent my each + * vertex. + * + */ +public class SemiClusterMessage implements + WritableComparable { + + private String semiClusterId; + private double semiClusterScore; + private List> semiClusterVertexList = new ArrayList>(); + private Set semiClusterContainThis = new TreeSet(); + + public SemiClusterMessage(String scId, + List> verticesEdges, + double score) { + this.semiClusterId = scId; + this.semiClusterVertexList = verticesEdges; + this.semiClusterScore = score; + } + + public SemiClusterMessage(SemiClusterMessage msg) { + this.semiClusterId = msg.getScId(); + for (Vertex v : msg + .getVertexList()) + this.semiClusterVertexList.add(v); + this.semiClusterScore = msg.getScore(); + } + + public SemiClusterMessage(Set semiClusterContainThis) { + this.semiClusterId = ""; + this.semiClusterScore = 0.0; + this.semiClusterVertexList = null; + this.semiClusterContainThis = semiClusterContainThis; + } + + public SemiClusterMessage() { + } + + public double getScore() { + return semiClusterScore; + } + + public void setScore(double score) { + this.semiClusterScore = score; + } + + public List> getVertexList() { + return semiClusterVertexList; + } + + public void addVertex(Vertex v) { + this.semiClusterVertexList.add(v); + } + + public String getScId() { + return semiClusterId; + } + + public void setScId(String scId) { + this.semiClusterId = scId; + } + + public void readFields(DataInput in) throws IOException { + clear(); + String semiClusterId = in.readUTF(); + setScId(semiClusterId); + double score = in.readDouble(); + setScore(score); + if (in.readBoolean()) { + int len = in.readInt(); + if (len > 0) { + for (int i = 0; i < len; i++) { + SemiClusteringVertex v = new SemiClusteringVertex(); + v.readFields(in); + semiClusterVertexList.add(v); + } + } + } + int len = in.readInt(); + if (len > 0) { + for (int i = 0; i < len; i++) { + SemiClusterDetails sd = new SemiClusterDetails(); + sd.readFields(in); + semiClusterContainThis.add(sd); + } + } + + } + + private void clear() { + semiClusterVertexList = new ArrayList>(); + semiClusterContainThis = new TreeSet(); + } + + public void write(DataOutput out) throws IOException { + out.writeUTF(semiClusterId); + out.writeDouble(semiClusterScore); + + if (this.semiClusterVertexList == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(semiClusterVertexList.size()); + for (Vertex v : semiClusterVertexList) { + v.write(out); + } + } + out.writeInt(semiClusterContainThis.size()); + Iterator itr = semiClusterContainThis.iterator(); + while (itr.hasNext()) + itr.next().write(out); + } + + public Set getSemiClusterContainThis() { + return semiClusterContainThis; + } + + public void setSemiClusterContainThis( + List semiClusterContainThis, + int graphJobVertexMaxClusterCount) { + int clusterCountToBeRemoved = 0; + NavigableSet setSort = new TreeSet( + new Comparator() { + + @Override + public int compare(SemiClusterDetails o1, SemiClusterDetails o2) { + return (o1.getSemiClusterScore() == o2.getSemiClusterScore() ? 0 + : o1.getSemiClusterScore() < o2.getSemiClusterScore() ? -1 : 1); + } + }); + setSort.addAll(this.semiClusterContainThis); + setSort.addAll(semiClusterContainThis); + clusterCountToBeRemoved = setSort.size() - graphJobVertexMaxClusterCount; + Iterator itr = setSort.descendingIterator(); + while (clusterCountToBeRemoved > 0) { + itr.next(); + itr.remove(); + clusterCountToBeRemoved--; + } + this.semiClusterContainThis = setSort; + + } + + public int compareTo(SemiClusterMessage m) { + return (this.getScId().compareTo(m.getScId())); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SemiClusterMessage other = (SemiClusterMessage) obj; + if (semiClusterId == null) { + if (other.semiClusterId != null) + return false; + } else if (!semiClusterId.equals(other.semiClusterId)) + return false; + return true; + } + + @Override + public String toString() { + return "SCMessage [semiClusterId=" + semiClusterId + ", semiClusterScore=" + + semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList + + ", semiClusterContainThis=" + semiClusterContainThis + "]"; + } +} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java (working copy) @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; + +/** + * SemiClusterTextReader defines the way in which data is to be read from the + * input file and store as a vertex with VertexId and Edges + * + */ +public class SemiClusterTextReader extends + VertexInputReader { + + String lastVertexId = null; + List adjacents = new ArrayList(); + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) { + + String line = value.toString(); + String[] lineSplit = line.split("\t"); + if (!line.startsWith("#")) { + if (lastVertexId == null) { + lastVertexId = lineSplit[0]; + } + if (lastVertexId.equals(lineSplit[0])) { + adjacents.add(lineSplit[1]); + } else { + vertex.setVertexID(new Text(lastVertexId)); + for (String adjacent : adjacents) { + String[] ValueSplit = adjacent.split("-"); + vertex.addEdge(new Edge( + new Text(ValueSplit[0]), new DoubleWritable(Double + .parseDouble(ValueSplit[1])))); + } + adjacents.clear(); + lastVertexId = lineSplit[0]; + adjacents.add(lineSplit[1]); + return true; + } + } + return false; + } + +} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java (working copy) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.graph.GraphJobMessage; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexOutputWriter; + +/** + * The VertexOutputWriter defines what parts of the vertex shall be written to + * the output format. + * + * @param the vertexID type. + * @param the edge value type. + * @param the vertex value type. + */ +@SuppressWarnings("rawtypes") +public class SemiClusterVertexOutputWriter + implements VertexOutputWriter { + + @Override + public void setup(Configuration conf) { + // do nothing + } + + @SuppressWarnings("unchecked") + @Override + public void write(Vertex vertex, + BSPPeer peer) + throws IOException { + SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue(); + peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue + .getSemiClusterContainThis().toString())); + } + +} Index: examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java (working copy) @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; + +/** + * SemiClusteringVertex Class defines each vertex in a Graph job and the + * compute() method is the function which is applied on each Vertex in the graph + * on each Super step of the job execution. + * + */ +public class SemiClusteringVertex extends + Vertex { + private int semiClusterMaximumVertexCount; + private int graphJobMessageSentCount; + private int graphJobVertexMaxClusterCount; + + @Override + public void setup(Configuration conf) { + semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count", + 10); + graphJobMessageSentCount = conf.getInt( + "semicluster.max.message.sent.count", 10); + graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count", 10); + } + + /** + * The user overrides the Compute() method, which will be executed at each + * active vertex in every superstep + */ + @Override + public void compute(Iterable messages) throws IOException { + if (this.getSuperstepCount() == 0) { + firstSuperStep(); + } + if (this.getSuperstepCount() >= 1) { + Set scListContainThis = new TreeSet(); + Set scListNotContainThis = new TreeSet(); + List scList = new ArrayList(); + for (SemiClusterMessage msg : messages) { + if (!isVertexInSc(msg)) { + scListNotContainThis.add(msg); + SemiClusterMessage msgNew = new SemiClusterMessage(msg); + msgNew.addVertex(this); + msgNew + .setScId("C" + createNewSemiClusterName(msgNew.getVertexList())); + msgNew.setScore(semiClusterScoreCalcuation(msgNew)); + scListContainThis.add(msgNew); + } else { + scListContainThis.add(msg); + } + } + scList.addAll(scListContainThis); + scList.addAll(scListNotContainThis); + sendBestSCMsg(scList); + updatesVertexSemiClustersList(scListContainThis); + } + } + + public List addSCList(List scList, + SemiClusterMessage msg) { + return scList; + } + + /** + * This function create a new Semi-cluster ID for a semi-cluster from the list + * of vertices in the cluster.It first take all the vertexIds as a list sort + * the list and then find the HashCode of the Sorted List. + */ + public int createNewSemiClusterName( + List> semiClusterVertexList) { + List vertexIDList = getSemiClusterVerticesIdList(semiClusterVertexList); + Collections.sort(vertexIDList); + return (vertexIDList.hashCode()); + } + + /** + * Function which is executed in the first SuperStep + * + * @throws IOException + */ + public void firstSuperStep() throws IOException { + Vertex v = this; + List> lV = new ArrayList>(); + lV.add(v); + String newClusterName = "C" + createNewSemiClusterName(lV); + SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName, + lV, 1); + sendMessageToNeighbors(initialClusters); + Set scList = new TreeSet(); + scList.add(new SemiClusterDetails(newClusterName, 1.0)); + SemiClusterMessage vertexValue = new SemiClusterMessage(scList); + this.setValue(vertexValue); + } + + /** + * Vertex V updates its list of semi-clusters with the semi- clusters from c1 + * , ..., ck , c'1 , ..., c'k that contain V + */ + public void updatesVertexSemiClustersList( + Set scListContainThis) throws IOException { + List scList = new ArrayList(); + Set sortedSet = new TreeSet( + new Comparator() { + + @Override + public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { + return (o1.getScore() == o2.getScore() ? 0 + : o1.getScore() < o2.getScore() ? -1 : 1); + } + }); + sortedSet.addAll(scListContainThis); + int count = 0; + for (SemiClusterMessage msg : sortedSet) { + scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore())); + if (count > graphJobMessageSentCount) + break; + } + + SemiClusterMessage vertexValue = this.getValue(); + vertexValue + .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount); + this.setValue(vertexValue); + } + + /** + * Function to calcualte the Score of a semi-cluster + * + * @param message + * @return + */ + public double semiClusterScoreCalcuation(SemiClusterMessage message) { + double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0; + int vC = 0, eC = 0; + List vertexId = getSemiClusterVerticesIdList(message + .getVertexList()); + vC = vertexId.size(); + for (Vertex v : message + .getVertexList()) { + List> eL = v.getEdges(); + for (Edge e : eL) { + eC++; + if (vertexId.contains(e.getDestinationVertexID().toString()) + && e.getCost() != null) { + iC = iC + e.getCost().get(); + } else if (e.getCost() != null) { + bC = bC + e.getCost().get(); + } + } + } + if (vC > 1) + sC = ((iC - fB * bC) / ((vC * (vC - 1)) / 2)) / eC; + return sC; + } + + /** + * Returns a Array List of vertexIds from a List of Vertex Objects + * + * @param lV + * @return + */ + public List getSemiClusterVerticesIdList( + List> lV) { + Iterator> vertexItrator = lV + .iterator(); + List vertexId = new ArrayList(); + while (vertexItrator.hasNext()) { + vertexId.add(vertexItrator.next().getVertexID().toString()); + } + return vertexId; + } + + /** + * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is + * added to c to form c' . + */ + public boolean isVertexInSc(SemiClusterMessage msg) { + List vertexId = getSemiClusterVerticesIdList(msg.getVertexList()); + if (vertexId.contains(this.getVertexID().toString()) + && vertexId.size() < semiClusterMaximumVertexCount) + return true; + else + return false; + } + + /** + * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their scores, + * and the best ones are sent to V ?? neighbors. + */ + public void sendBestSCMsg(List scList) throws IOException { + Collections.sort(scList, new Comparator() { + + @Override + public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { + return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2 + .getScore() ? -1 : 1); + } + }); + Iterator scItr = scList.iterator(); + int count = 0; + while (scItr.hasNext()) { + sendMessageToNeighbors(scItr.next()); + count++; + if (count > graphJobMessageSentCount) + break; + } + } +}