Index: src/test/org/apache/hama/bsp/TestBSPPeer.java =================================================================== --- src/test/org/apache/hama/bsp/TestBSPPeer.java (리비전 1028159) +++ src/test/org/apache/hama/bsp/TestBSPPeer.java (작업 사본) @@ -26,6 +26,8 @@ import java.util.Random; import java.util.Set; +import junit.framework.AssertionFailedError; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,7 +56,7 @@ this.conf = getConf(); } - public void setUp() throws Exception { + public void setUp() throws Exception { super.setUp(); ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); @@ -81,9 +83,12 @@ public class BSPPeerThread extends Thread { private BSPPeer peer; - private int MAXIMUM_DURATION = 5; + private int MAXIMUM_DURATION = 5; + private int lastTwoDigitsOfPort; + private int errorCount = 0; public BSPPeerThread(Configuration conf) throws IOException { + lastTwoDigitsOfPort = conf.getInt(Constants.PEER_PORT, 0) - 30000; this.peer = new BSPPeer(conf); Set peerNames = new HashSet(NUM_PEER); for (int i = 0; i < NUM_PEER; i++) { @@ -92,8 +97,7 @@ peer.setAllPeerNames(peerNames); } - @Override - public void run() { + public void run() throws AssertionFailedError { int randomTime; byte[] dummyData = new byte[PAYLOAD]; BSPMessage msg = null; @@ -128,22 +132,37 @@ e.printStackTrace(); } - assertEquals(peer.getNumCurrentMessages(), 1); verifyPayload(); } - - assertEquals(peer.getNumCurrentMessages(), NUM_PEER * ROUND); } private void verifyPayload() { - System.out.println("[" + getName() + "] verifying " - + peer.localQueue.size() + " messages"); + int numMessages = peer.getNumCurrentMessages(); + LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages + + " messages"); + + try { + if (lastTwoDigitsOfPort < 10) { + assertEquals(20, numMessages); + } else { + assertEquals(0, numMessages); + } + } catch (AssertionFailedError afe) { + LOG.error(afe); + errorCount++; + } + BSPMessage msg = null; try { while ((msg = peer.getCurrentMessage()) != null) { - assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data, - msg.data.length - 128, 128), 0); + try { + assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data, + msg.data.length - 128, 128), 0); + } catch (AssertionFailedError afe) { + LOG.error(afe); + errorCount++; + } } } catch (IOException e) { LOG.error(e); @@ -151,10 +170,14 @@ peer.localQueue.clear(); } - + public BSPPeer getBSPPeer() { return this.peer; } + + public int getErrorCount() { + return this.errorCount; + } } public void testSync() throws InterruptedException, IOException { @@ -164,7 +187,7 @@ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost"); conf.set(Constants.PEER_HOST, "localhost"); conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); - + for (int i = 0; i < NUM_PEER; i++) { conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); thread = new BSPPeerThread(conf); @@ -178,8 +201,12 @@ for (int i = 0; i < NUM_PEER; i++) { list.get(i).join(); } + + for (int i = 0; i < NUM_PEER; i++) { + assertEquals(list.get(i).getErrorCount(), 0); + } } - + @Override public void process(WatchedEvent event) { }