Index: ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java (revision 0) @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.ml.semiclustering; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.graph.Vertex; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; + +/** + * The SemiClusterMessage class defines the structure of the value stored by + * each vertex in the graph Job which is same as the Message sent my each + * vertex. + * + */ +public class SemiClusterMessage implements + WritableComparable { + + private String semiClusterId; + private double semiClusterScore; + private List> semiClusterVertexList = new ArrayList>(); + private Set semiClusterContainThis = new TreeSet(); + + public SemiClusterMessage(String scId, + List> verticesEdges, + double score) { + this.semiClusterId = scId; + this.semiClusterVertexList = verticesEdges; + this.semiClusterScore = score; + } + + public SemiClusterMessage(SemiClusterMessage msg) { + this.semiClusterId = msg.getScId(); + for (Vertex v : msg + .getVertexList()) + this.semiClusterVertexList.add(v); + this.semiClusterScore = msg.getScore(); + } + + public SemiClusterMessage(Set semiClusterContainThis) { + this.semiClusterId = ""; + this.semiClusterScore = 0.0; + this.semiClusterVertexList = null; + this.semiClusterContainThis = semiClusterContainThis; + } + + public SemiClusterMessage() { + } + + public double getScore() { + return semiClusterScore; + } + + public void setScore(double score) { + this.semiClusterScore = score; + } + + public List> getVertexList() { + return semiClusterVertexList; + } + + public void addVertex(Vertex v) { + this.semiClusterVertexList.add(v); + } + + public String getScId() { + return semiClusterId; + } + + public void setScId(String scId) { + this.semiClusterId = scId; + } + + public void readFields(DataInput in) throws IOException { + clear(); + String semiClusterId = in.readUTF(); + setScId(semiClusterId); + double score = in.readDouble(); + setScore(score); + if (in.readBoolean()) { + int len = in.readInt(); + if (len > 0) { + for (int i = 0; i < len; i++) { + SemiClusteringVertex v = new SemiClusteringVertex(); + v.readFields(in); + semiClusterVertexList.add(v); + } + } + } + int len = in.readInt(); + if (len > 0) { + for (int i = 0; i < len; i++) { + SemiClusterDetails sd = new SemiClusterDetails(); + sd.readFields(in); + semiClusterContainThis.add(sd); + } + } + + } + + private void clear() { + semiClusterVertexList = new ArrayList>(); + semiClusterContainThis = new TreeSet(); + } + + public void write(DataOutput out) throws IOException { + out.writeUTF(semiClusterId); + out.writeDouble(semiClusterScore); + + if (this.semiClusterVertexList == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(semiClusterVertexList.size()); + for (Vertex v : semiClusterVertexList) { + v.write(out); + } + } + out.writeInt(semiClusterContainThis.size()); + Iterator itr = semiClusterContainThis.iterator(); + while (itr.hasNext()) + itr.next().write(out); + } + + public Set getSemiClusterContainThis() { + return semiClusterContainThis; + } + + public void setSemiClusterContainThis( + List semiClusterContainThis, + int graphJobVertexMaxClusterCount) { + int clusterCountToBeRemoved = 0; + NavigableSet setSort = new TreeSet( + new Comparator() { + + @Override + public int compare(SemiClusterDetails o1, SemiClusterDetails o2) { + return (o1.getSemiClusterScore() == o2.getSemiClusterScore() ? 0 + : o1.getSemiClusterScore() < o2.getSemiClusterScore() ? -1 : 1); + } + }); + setSort.addAll(this.semiClusterContainThis); + setSort.addAll(semiClusterContainThis); + clusterCountToBeRemoved = setSort.size() - graphJobVertexMaxClusterCount; + Iterator itr = setSort.descendingIterator(); + while (clusterCountToBeRemoved > 0) { + itr.next(); + itr.remove(); + clusterCountToBeRemoved--; + } + this.semiClusterContainThis = setSort; + + } + + public int compareTo(SemiClusterMessage m) { + return (this.getScId().compareTo(m.getScId())); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SemiClusterMessage other = (SemiClusterMessage) obj; + if (semiClusterId == null) { + if (other.semiClusterId != null) + return false; + } else if (!semiClusterId.equals(other.semiClusterId)) + return false; + return true; + } + + @Override + public String toString() { + return "SCMessage [semiClusterId=" + semiClusterId + ", semiClusterScore=" + + semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList + + ", semiClusterContainThis=" + semiClusterContainThis + "]"; + } +} Index: ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java (revision 0) @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.ml.semiclustering; + +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * The SemiClusterDetails class is used to store a Semi-Cluster ID and its + * score.This class implements Comparable interface which compares the score of + * the objects. + * + */ + +public class SemiClusterDetails implements + WritableComparable { + + private String semiClusterId; + private double semiClusterScore; + + public SemiClusterDetails() { + this.semiClusterId = ""; + this.semiClusterScore = 1.0; + } + + public SemiClusterDetails(String semiClusterId, double semiClusterScore) { + this.semiClusterId = semiClusterId; + this.semiClusterScore = semiClusterScore; + } + + public String getSemiClusterId() { + return semiClusterId; + } + + public void setSemiClusterId(String semiClusterId) { + this.semiClusterId = semiClusterId; + } + + public double getSemiClusterScore() { + return semiClusterScore; + } + + public void setSemiClusterScore(double semiClusterScore) { + this.semiClusterScore = semiClusterScore; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); + long temp; + temp = Double.doubleToLongBits(semiClusterScore); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SemiClusterDetails other = (SemiClusterDetails) obj; + if (semiClusterId == null) { + if (other.semiClusterId != null) + return false; + } else if (!semiClusterId.equals(other.semiClusterId)) + return false; + if (Double.doubleToLongBits(semiClusterScore) != Double + .doubleToLongBits(other.semiClusterScore)) + return false; + return true; + } + + @Override + public String toString() { + return semiClusterId; + } + + @Override + public void readFields(DataInput in) throws IOException { + String semiClusterId = in.readUTF(); + setSemiClusterId(semiClusterId); + double score = in.readDouble(); + setSemiClusterScore(score); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(semiClusterId); + out.writeDouble(semiClusterScore); + } + + @Override + public int compareTo(SemiClusterDetails sc) { + return (this.getSemiClusterId().compareTo(sc.getSemiClusterId())); + } +} Index: ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterTextReader.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterTextReader.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterTextReader.java (revision 0) @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.ml.semiclustering; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; + +import java.util.ArrayList; +import java.util.List; + +/** + * SemiClusterTextReader defines the way in which data is to be read from the + * input file and store as a vertex with VertexId and Edges + * + */ +public class SemiClusterTextReader extends + VertexInputReader { + + String lastVertexId = null; + List adjacents = new ArrayList(); + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) { + + String line = value.toString(); + String[] lineSplit = line.split("\t"); + if (!line.startsWith("#")) { + if (lastVertexId == null) { + lastVertexId = lineSplit[0]; + } + if (lastVertexId.equals(lineSplit[0])) { + adjacents.add(lineSplit[1]); + } else { + vertex.setVertexID(new Text(lastVertexId)); + for (String adjacent : adjacents) { + String[] ValueSplit = adjacent.split("-"); + vertex.addEdge(new Edge( + new Text(ValueSplit[0]), new DoubleWritable(Double + .parseDouble(ValueSplit[1])))); + } + adjacents.clear(); + lastVertexId = lineSplit[0]; + adjacents.add(lineSplit[1]); + return true; + } + } + return false; + } + +} Index: ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterVertexOutputWriter.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterVertexOutputWriter.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterVertexOutputWriter.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.ml.semiclustering; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.graph.GraphJobMessage; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexOutputWriter; + +import java.io.IOException; + +/** + * The VertexOutputWriter defines what parts of the vertex shall be written to + * the output format. + * + * @param the vertexID type. + * @param the edge value type. + * @param the vertex value type. + */ +@SuppressWarnings("rawtypes") +public class SemiClusterVertexOutputWriter + implements VertexOutputWriter { + + @Override + public void setup(Configuration conf) { + // do nothing + } + + @SuppressWarnings("unchecked") + @Override + public void write(Vertex vertex, + BSPPeer peer) + throws IOException { + SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue(); + peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue + .getSemiClusterContainThis().toString())); + } + +} Index: ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java (revision 0) @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.ml.semiclustering; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; + +import java.io.IOException; +import java.util.*; + +/** + * SemiClusteringVertex Class defines each vertex in a Graph job and the + * compute() method is the function which is applied on each Vertex in the graph + * on each Super step of the job execution. + * + */ +public class SemiClusteringVertex extends + Vertex { + private int semiClusterMaximumVertexCount; + private int graphJobMessageSentCount; + private int graphJobVertexMaxClusterCount; + + @Override + public void setup(Configuration conf) { + semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count", + 10); + graphJobMessageSentCount = conf.getInt( + "semicluster.max.message.sent.count", 10); + graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count", 10); + } + + /** + * The user overrides the Compute() method, which will be executed at each + * active vertex in every superstep + */ + @Override + public void compute(Iterable messages) throws IOException { + if (this.getSuperstepCount() == 0) { + firstSuperStep(); + } + if (this.getSuperstepCount() >= 1) { + Set scListContainThis = new TreeSet(); + Set scListNotContainThis = new TreeSet(); + List scList = new ArrayList(); + for (SemiClusterMessage msg : messages) { + if (!isVertexInSc(msg)) { + scListNotContainThis.add(msg); + SemiClusterMessage msgNew = new SemiClusterMessage(msg); + msgNew.addVertex(this); + msgNew + .setScId("C" + createNewSemiClusterName(msgNew.getVertexList())); + msgNew.setScore(semiClusterScoreCalcuation(msgNew)); + scListContainThis.add(msgNew); + } else { + scListContainThis.add(msg); + } + } + scList.addAll(scListContainThis); + scList.addAll(scListNotContainThis); + sendBestSCMsg(scList); + updatesVertexSemiClustersList(scListContainThis); + } + } + + public List addSCList(List scList, + SemiClusterMessage msg) { + return scList; + } + + /** + * This function create a new Semi-cluster ID for a semi-cluster from the list + * of vertices in the cluster.It first take all the vertexIds as a list sort + * the list and then find the HashCode of the Sorted List. + */ + public int createNewSemiClusterName( + List> semiClusterVertexList) { + List vertexIDList = getSemiClusterVerticesIdList(semiClusterVertexList); + Collections.sort(vertexIDList); + return (vertexIDList.hashCode()); + } + + /** + * Function which is executed in the first SuperStep + * + * @throws java.io.IOException + */ + public void firstSuperStep() throws IOException { + Vertex v = this; + List> lV = new ArrayList>(); + lV.add(v); + String newClusterName = "C" + createNewSemiClusterName(lV); + SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName, + lV, 1); + sendMessageToNeighbors(initialClusters); + Set scList = new TreeSet(); + scList.add(new SemiClusterDetails(newClusterName, 1.0)); + SemiClusterMessage vertexValue = new SemiClusterMessage(scList); + this.setValue(vertexValue); + } + + /** + * Vertex V updates its list of semi-clusters with the semi- clusters from c1 + * , ..., ck , c'1 , ..., c'k that contain V + */ + public void updatesVertexSemiClustersList( + Set scListContainThis) throws IOException { + List scList = new ArrayList(); + Set sortedSet = new TreeSet( + new Comparator() { + + @Override + public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { + return (o1.getScore() == o2.getScore() ? 0 + : o1.getScore() < o2.getScore() ? -1 : 1); + } + }); + sortedSet.addAll(scListContainThis); + int count = 0; + for (SemiClusterMessage msg : sortedSet) { + scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore())); + if (count > graphJobMessageSentCount) + break; + } + + SemiClusterMessage vertexValue = this.getValue(); + vertexValue + .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount); + this.setValue(vertexValue); + } + + /** + * Function to calcualte the Score of a semi-cluster + * + * @param message + * @return + */ + public double semiClusterScoreCalcuation(SemiClusterMessage message) { + double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0; + int vC = 0, eC = 0; + List vertexId = getSemiClusterVerticesIdList(message + .getVertexList()); + vC = vertexId.size(); + for (Vertex v : message + .getVertexList()) { + List> eL = v.getEdges(); + for (Edge e : eL) { + eC++; + if (vertexId.contains(e.getDestinationVertexID().toString()) + && e.getCost() != null) { + iC = iC + e.getCost().get(); + } else if (e.getCost() != null) { + bC = bC + e.getCost().get(); + } + } + } + if (vC > 1) + sC = ((iC - fB * bC) / ((vC * (vC - 1)) / 2)) / eC; + return sC; + } + + /** + * Returns a Array List of vertexIds from a List of Vertex Objects + * + * @param lV + * @return + */ + public List getSemiClusterVerticesIdList( + List> lV) { + Iterator> vertexItrator = lV + .iterator(); + List vertexId = new ArrayList(); + while (vertexItrator.hasNext()) { + vertexId.add(vertexItrator.next().getVertexID().toString()); + } + return vertexId; + } + + /** + * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is + * added to c to form c' . + */ + public boolean isVertexInSc(SemiClusterMessage msg) { + List vertexId = getSemiClusterVerticesIdList(msg.getVertexList()); + if (vertexId.contains(this.getVertexID().toString()) + && vertexId.size() < semiClusterMaximumVertexCount) + return true; + else + return false; + } + + /** + * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their scores, + * and the best ones are sent to V ?? neighbors. + */ + public void sendBestSCMsg(List scList) throws IOException { + Collections.sort(scList, new Comparator() { + + @Override + public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { + return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2 + .getScore() ? -1 : 1); + } + }); + Iterator scItr = scList.iterator(); + int count = 0; + while (scItr.hasNext()) { + sendMessageToNeighbors(scItr.next()); + count++; + if (count > graphJobMessageSentCount) + break; + } + } +} Index: ml/pom.xml =================================================================== --- ml/pom.xml (revision 1521295) +++ ml/pom.xml (working copy) @@ -36,6 +36,11 @@ hama-core ${project.version} + + org.apache.hama + hama-graph + ${project.version} + Index: examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java (working copy) @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.examples; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hama.graph.Vertex; - -/** - * The SemiClusterMessage class defines the structure of the value stored by - * each vertex in the graph Job which is same as the Message sent my each - * vertex. - * - */ -public class SemiClusterMessage implements - WritableComparable { - - private String semiClusterId; - private double semiClusterScore; - private List> semiClusterVertexList = new ArrayList>(); - private Set semiClusterContainThis = new TreeSet(); - - public SemiClusterMessage(String scId, - List> verticesEdges, - double score) { - this.semiClusterId = scId; - this.semiClusterVertexList = verticesEdges; - this.semiClusterScore = score; - } - - public SemiClusterMessage(SemiClusterMessage msg) { - this.semiClusterId = msg.getScId(); - for (Vertex v : msg - .getVertexList()) - this.semiClusterVertexList.add(v); - this.semiClusterScore = msg.getScore(); - } - - public SemiClusterMessage(Set semiClusterContainThis) { - this.semiClusterId = ""; - this.semiClusterScore = 0.0; - this.semiClusterVertexList = null; - this.semiClusterContainThis = semiClusterContainThis; - } - - public SemiClusterMessage() { - } - - public double getScore() { - return semiClusterScore; - } - - public void setScore(double score) { - this.semiClusterScore = score; - } - - public List> getVertexList() { - return semiClusterVertexList; - } - - public void addVertex(Vertex v) { - this.semiClusterVertexList.add(v); - } - - public String getScId() { - return semiClusterId; - } - - public void setScId(String scId) { - this.semiClusterId = scId; - } - - public void readFields(DataInput in) throws IOException { - clear(); - String semiClusterId = in.readUTF(); - setScId(semiClusterId); - double score = in.readDouble(); - setScore(score); - if (in.readBoolean()) { - int len = in.readInt(); - if (len > 0) { - for (int i = 0; i < len; i++) { - SemiClusteringVertex v = new SemiClusteringVertex(); - v.readFields(in); - semiClusterVertexList.add(v); - } - } - } - int len = in.readInt(); - if (len > 0) { - for (int i = 0; i < len; i++) { - SemiClusterDetails sd = new SemiClusterDetails(); - sd.readFields(in); - semiClusterContainThis.add(sd); - } - } - - } - - private void clear() { - semiClusterVertexList = new ArrayList>(); - semiClusterContainThis = new TreeSet(); - } - - public void write(DataOutput out) throws IOException { - out.writeUTF(semiClusterId); - out.writeDouble(semiClusterScore); - - if (this.semiClusterVertexList == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeInt(semiClusterVertexList.size()); - for (Vertex v : semiClusterVertexList) { - v.write(out); - } - } - out.writeInt(semiClusterContainThis.size()); - Iterator itr = semiClusterContainThis.iterator(); - while (itr.hasNext()) - itr.next().write(out); - } - - public Set getSemiClusterContainThis() { - return semiClusterContainThis; - } - - public void setSemiClusterContainThis( - List semiClusterContainThis, - int graphJobVertexMaxClusterCount) { - int clusterCountToBeRemoved = 0; - NavigableSet setSort = new TreeSet( - new Comparator() { - - @Override - public int compare(SemiClusterDetails o1, SemiClusterDetails o2) { - return (o1.getSemiClusterScore() == o2.getSemiClusterScore() ? 0 - : o1.getSemiClusterScore() < o2.getSemiClusterScore() ? -1 : 1); - } - }); - setSort.addAll(this.semiClusterContainThis); - setSort.addAll(semiClusterContainThis); - clusterCountToBeRemoved = setSort.size() - graphJobVertexMaxClusterCount; - Iterator itr = setSort.descendingIterator(); - while (clusterCountToBeRemoved > 0) { - itr.next(); - itr.remove(); - clusterCountToBeRemoved--; - } - this.semiClusterContainThis = setSort; - - } - - public int compareTo(SemiClusterMessage m) { - return (this.getScId().compareTo(m.getScId())); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result - + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - SemiClusterMessage other = (SemiClusterMessage) obj; - if (semiClusterId == null) { - if (other.semiClusterId != null) - return false; - } else if (!semiClusterId.equals(other.semiClusterId)) - return false; - return true; - } - - @Override - public String toString() { - return "SCMessage [semiClusterId=" + semiClusterId + ", semiClusterScore=" - + semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList - + ", semiClusterContainThis=" + semiClusterContainThis + "]"; - } -} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java (working copy) @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.examples; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; - -/** - * The SemiClusterDetails class is used to store a Semi-Cluster ID and its - * score.This class implements Comparable interface which compares the score of - * the objects. - * - */ - -public class SemiClusterDetails implements - WritableComparable { - - private String semiClusterId; - private double semiClusterScore; - - public SemiClusterDetails() { - this.semiClusterId = ""; - this.semiClusterScore = 1.0; - } - - public SemiClusterDetails(String semiClusterId, double semiClusterScore) { - this.semiClusterId = semiClusterId; - this.semiClusterScore = semiClusterScore; - } - - public String getSemiClusterId() { - return semiClusterId; - } - - public void setSemiClusterId(String semiClusterId) { - this.semiClusterId = semiClusterId; - } - - public double getSemiClusterScore() { - return semiClusterScore; - } - - public void setSemiClusterScore(double semiClusterScore) { - this.semiClusterScore = semiClusterScore; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result - + ((semiClusterId == null) ? 0 : semiClusterId.hashCode()); - long temp; - temp = Double.doubleToLongBits(semiClusterScore); - result = prime * result + (int) (temp ^ (temp >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - SemiClusterDetails other = (SemiClusterDetails) obj; - if (semiClusterId == null) { - if (other.semiClusterId != null) - return false; - } else if (!semiClusterId.equals(other.semiClusterId)) - return false; - if (Double.doubleToLongBits(semiClusterScore) != Double - .doubleToLongBits(other.semiClusterScore)) - return false; - return true; - } - - @Override - public String toString() { - return semiClusterId; - } - - @Override - public void readFields(DataInput in) throws IOException { - String semiClusterId = in.readUTF(); - setSemiClusterId(semiClusterId); - double score = in.readDouble(); - setSemiClusterScore(score); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(semiClusterId); - out.writeDouble(semiClusterScore); - } - - @Override - public int compareTo(SemiClusterDetails sc) { - return (this.getSemiClusterId().compareTo(sc.getSemiClusterId())); - } -} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hama.examples; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -31,7 +29,13 @@ import org.apache.hama.bsp.TextInputFormat; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.GraphJob; +import org.apache.hama.ml.semiclustering.SemiClusterMessage; +import org.apache.hama.ml.semiclustering.SemiClusterTextReader; +import org.apache.hama.ml.semiclustering.SemiClusterVertexOutputWriter; +import org.apache.hama.ml.semiclustering.SemiClusteringVertex; +import java.io.IOException; + public class SemiClusterJobDriver { protected static final Log LOG = LogFactory Index: examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java (working copy) @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.examples; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.graph.GraphJobMessage; -import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexOutputWriter; - -/** - * The VertexOutputWriter defines what parts of the vertex shall be written to - * the output format. - * - * @param the vertexID type. - * @param the edge value type. - * @param the vertex value type. - */ -@SuppressWarnings("rawtypes") -public class SemiClusterVertexOutputWriter - implements VertexOutputWriter { - - @Override - public void setup(Configuration conf) { - // do nothing - } - - @SuppressWarnings("unchecked") - @Override - public void write(Vertex vertex, - BSPPeer peer) - throws IOException { - SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue(); - peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue - .getSemiClusterContainThis().toString())); - } - -} Index: examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java (working copy) @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.examples; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hama.graph.Edge; -import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; - -/** - * SemiClusterTextReader defines the way in which data is to be read from the - * input file and store as a vertex with VertexId and Edges - * - */ -public class SemiClusterTextReader extends - VertexInputReader { - - String lastVertexId = null; - List adjacents = new ArrayList(); - - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) { - - String line = value.toString(); - String[] lineSplit = line.split("\t"); - if (!line.startsWith("#")) { - if (lastVertexId == null) { - lastVertexId = lineSplit[0]; - } - if (lastVertexId.equals(lineSplit[0])) { - adjacents.add(lineSplit[1]); - } else { - vertex.setVertexID(new Text(lastVertexId)); - for (String adjacent : adjacents) { - String[] ValueSplit = adjacent.split("-"); - vertex.addEdge(new Edge( - new Text(ValueSplit[0]), new DoubleWritable(Double - .parseDouble(ValueSplit[1])))); - } - adjacents.clear(); - lastVertexId = lineSplit[0]; - adjacents.add(lineSplit[1]); - return true; - } - } - return false; - } - -} Index: examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java (revision 1521295) +++ examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java (working copy) @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.examples; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.Text; -import org.apache.hama.graph.Edge; -import org.apache.hama.graph.Vertex; - -/** - * SemiClusteringVertex Class defines each vertex in a Graph job and the - * compute() method is the function which is applied on each Vertex in the graph - * on each Super step of the job execution. - * - */ -public class SemiClusteringVertex extends - Vertex { - private int semiClusterMaximumVertexCount; - private int graphJobMessageSentCount; - private int graphJobVertexMaxClusterCount; - - @Override - public void setup(Configuration conf) { - semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count", - 10); - graphJobMessageSentCount = conf.getInt( - "semicluster.max.message.sent.count", 10); - graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count", 10); - } - - /** - * The user overrides the Compute() method, which will be executed at each - * active vertex in every superstep - */ - @Override - public void compute(Iterable messages) throws IOException { - if (this.getSuperstepCount() == 0) { - firstSuperStep(); - } - if (this.getSuperstepCount() >= 1) { - Set scListContainThis = new TreeSet(); - Set scListNotContainThis = new TreeSet(); - List scList = new ArrayList(); - for (SemiClusterMessage msg : messages) { - if (!isVertexInSc(msg)) { - scListNotContainThis.add(msg); - SemiClusterMessage msgNew = new SemiClusterMessage(msg); - msgNew.addVertex(this); - msgNew - .setScId("C" + createNewSemiClusterName(msgNew.getVertexList())); - msgNew.setScore(semiClusterScoreCalcuation(msgNew)); - scListContainThis.add(msgNew); - } else { - scListContainThis.add(msg); - } - } - scList.addAll(scListContainThis); - scList.addAll(scListNotContainThis); - sendBestSCMsg(scList); - updatesVertexSemiClustersList(scListContainThis); - } - } - - public List addSCList(List scList, - SemiClusterMessage msg) { - return scList; - } - - /** - * This function create a new Semi-cluster ID for a semi-cluster from the list - * of vertices in the cluster.It first take all the vertexIds as a list sort - * the list and then find the HashCode of the Sorted List. - */ - public int createNewSemiClusterName( - List> semiClusterVertexList) { - List vertexIDList = getSemiClusterVerticesIdList(semiClusterVertexList); - Collections.sort(vertexIDList); - return (vertexIDList.hashCode()); - } - - /** - * Function which is executed in the first SuperStep - * - * @throws IOException - */ - public void firstSuperStep() throws IOException { - Vertex v = this; - List> lV = new ArrayList>(); - lV.add(v); - String newClusterName = "C" + createNewSemiClusterName(lV); - SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName, - lV, 1); - sendMessageToNeighbors(initialClusters); - Set scList = new TreeSet(); - scList.add(new SemiClusterDetails(newClusterName, 1.0)); - SemiClusterMessage vertexValue = new SemiClusterMessage(scList); - this.setValue(vertexValue); - } - - /** - * Vertex V updates its list of semi-clusters with the semi- clusters from c1 - * , ..., ck , c'1 , ..., c'k that contain V - */ - public void updatesVertexSemiClustersList( - Set scListContainThis) throws IOException { - List scList = new ArrayList(); - Set sortedSet = new TreeSet( - new Comparator() { - - @Override - public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { - return (o1.getScore() == o2.getScore() ? 0 - : o1.getScore() < o2.getScore() ? -1 : 1); - } - }); - sortedSet.addAll(scListContainThis); - int count = 0; - for (SemiClusterMessage msg : sortedSet) { - scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore())); - if (count > graphJobMessageSentCount) - break; - } - - SemiClusterMessage vertexValue = this.getValue(); - vertexValue - .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount); - this.setValue(vertexValue); - } - - /** - * Function to calcualte the Score of a semi-cluster - * - * @param message - * @return - */ - public double semiClusterScoreCalcuation(SemiClusterMessage message) { - double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0; - int vC = 0, eC = 0; - List vertexId = getSemiClusterVerticesIdList(message - .getVertexList()); - vC = vertexId.size(); - for (Vertex v : message - .getVertexList()) { - List> eL = v.getEdges(); - for (Edge e : eL) { - eC++; - if (vertexId.contains(e.getDestinationVertexID().toString()) - && e.getCost() != null) { - iC = iC + e.getCost().get(); - } else if (e.getCost() != null) { - bC = bC + e.getCost().get(); - } - } - } - if (vC > 1) - sC = ((iC - fB * bC) / ((vC * (vC - 1)) / 2)) / eC; - return sC; - } - - /** - * Returns a Array List of vertexIds from a List of Vertex Objects - * - * @param lV - * @return - */ - public List getSemiClusterVerticesIdList( - List> lV) { - Iterator> vertexItrator = lV - .iterator(); - List vertexId = new ArrayList(); - while (vertexItrator.hasNext()) { - vertexId.add(vertexItrator.next().getVertexID().toString()); - } - return vertexId; - } - - /** - * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is - * added to c to form c' . - */ - public boolean isVertexInSc(SemiClusterMessage msg) { - List vertexId = getSemiClusterVerticesIdList(msg.getVertexList()); - if (vertexId.contains(this.getVertexID().toString()) - && vertexId.size() < semiClusterMaximumVertexCount) - return true; - else - return false; - } - - /** - * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their scores, - * and the best ones are sent to V ?? neighbors. - */ - public void sendBestSCMsg(List scList) throws IOException { - Collections.sort(scList, new Comparator() { - - @Override - public int compare(SemiClusterMessage o1, SemiClusterMessage o2) { - return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2 - .getScore() ? -1 : 1); - } - }); - Iterator scItr = scList.iterator(); - int count = 0; - while (scItr.hasNext()) { - sendMessageToNeighbors(scItr.next()); - count++; - if (count > graphJobMessageSentCount) - break; - } - } -}