Index: src/java/org/apache/hama/MiniZooKeeperCluster.java =================================================================== --- src/java/org/apache/hama/MiniZooKeeperCluster.java (리비전 0) +++ src/java/org/apache/hama/MiniZooKeeperCluster.java (리비전 0) @@ -0,0 +1,196 @@ +package org.apache.hama; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; + +public class MiniZooKeeperCluster { + private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class); + + private static final int TICK_TIME = 2000; + private static final int CONNECTION_TIMEOUT = 30000; + + private boolean started; + private int clientPort = 21810; // use non-standard port + + private NIOServerCnxn.Factory standaloneServerFactory; + private int tickTime = 0; + + /** Create mini Zookeeper cluster. */ + public MiniZooKeeperCluster() { + this.started = false; + } + + public void setClientPort(int clientPort) { + this.clientPort = clientPort; + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + private static void setupTestEnv() { + // during the tests we run with 100K prealloc in the logs. + // on windows systems prealloc of 64M was seen to take ~15seconds + // resulting in test failure (client timeout on first session). + // set env and directly in order to handle static init/gc issues + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100); + } + + /** + * @param baseDir + * @return ClientPort server bound to. + * @throws IOException + * @throws InterruptedException + */ + public int startup(File baseDir) throws IOException, + InterruptedException { + + setupTestEnv(); + + shutdown(); + + File dir = new File(baseDir, "zookeeper").getAbsoluteFile(); + recreateDir(dir); + + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + while (true) { + try { + standaloneServerFactory = + new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)); + } catch (BindException e) { + LOG.info("Faild binding ZK Server to client port: " + clientPort); + //this port is already in use. try to use another + clientPort++; + continue; + } + break; + } + standaloneServerFactory.startup(server); + + if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + started = true; + + return clientPort; + } + + private void recreateDir(File dir) throws IOException { + if (dir.exists()) { + FileUtil.fullyDelete(dir); + } + try { + dir.mkdirs(); + } catch (SecurityException e) { + throw new IOException("creating dir: " + dir, e); + } + } + + /** + * @throws IOException + */ + public void shutdown() throws IOException { + if (!started) { + return; + } + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + started = false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerDown(int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + } finally { + sock.close(); + } + } catch (IOException e) { + return true; + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerUp(int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + + Reader isr = new InputStreamReader(sock.getInputStream()); + reader = new BufferedReader(isr); + String line = reader.readLine(); + if (line != null && line.startsWith("Zookeeper version:")) { + return true; + } + } finally { + sock.close(); + if (reader != null) { + reader.close(); + } + } + } catch (IOException e) { + // ignore as this is expected + LOG.info("server localhost:" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } +} Index: src/java/org/apache/hama/bsp/ClusterStatus.java =================================================================== --- src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 1021565) +++ src/java/org/apache/hama/bsp/ClusterStatus.java (작업 사본) @@ -65,16 +65,16 @@ /** * */ - ClusterStatus() {} + public ClusterStatus() {} - ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) { + public ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) { this.numActiveGrooms = grooms; this.tasks = tasks; this.maxTasks = maxTasks; this.state = state; } - ClusterStatus(Collection activeGrooms, int tasks, int maxTasks, + public ClusterStatus(Collection activeGrooms, int tasks, int maxTasks, BSPMaster.State state) { this(activeGrooms.size(), tasks, maxTasks, state); this.activeGrooms = activeGrooms; Index: src/test/org/apache/hama/HamaCluster.java =================================================================== --- src/test/org/apache/hama/HamaCluster.java (리비전 1021565) +++ src/test/org/apache/hama/HamaCluster.java (작업 사본) @@ -19,26 +19,19 @@ */ package org.apache.hama; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.StringUtils; -import org.apache.hama.bsp.LocalBSPCluster; /** * Forming up the miniDfs and miniHbase */ -public abstract class HamaCluster extends TestCase { +public abstract class HamaCluster extends HamaClusterTestCase { public static final Log LOG = LogFactory.getLog(HamaCluster.class); protected final static HamaConfiguration conf = new HamaConfiguration(); protected void setUp() throws Exception { super.setUp(); - String[] args = new String[0]; - StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG); - HamaConfiguration conf = new HamaConfiguration(); //LocalBSPCluster cluster = new LocalBSPCluster(conf); //cluster.startup(); } @@ -46,8 +39,4 @@ protected static HamaConfiguration getConf() { return conf; } - - protected void setMiniBSPCluster() { - // TODO Auto-generated method stub - } } Index: src/test/org/apache/hama/HamaClusterTestCase.java =================================================================== --- src/test/org/apache/hama/HamaClusterTestCase.java (리비전 0) +++ src/test/org/apache/hama/HamaClusterTestCase.java (리비전 0) @@ -0,0 +1,152 @@ +package org.apache.hama; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.ReflectionUtils; + +public abstract class HamaClusterTestCase extends HamaTestCase { + public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class); + public MiniBSPCluster cluster; + protected MiniDFSCluster dfsCluster; + protected MiniZooKeeperCluster zooKeeperCluster; + protected int groomServers; + protected boolean startDfs; + + /** default constructor */ + public HamaClusterTestCase() { + this(1); + } + + public HamaClusterTestCase(int groomServers) { + this(groomServers, true); + } + + public HamaClusterTestCase(int groomServers, boolean startDfs) { + super(); + this.startDfs = startDfs; + this.groomServers = groomServers; + } + + /** + * Actually start the MiniBSP instance. + */ + protected void hamaClusterSetup() throws Exception { + File testDir = new File(getUnitTestdir(getName()).toString()); + + // Note that this is done before we create the MiniHBaseCluster because we + // need to edit the config to add the ZooKeeper servers. + this.zooKeeperCluster = new MiniZooKeeperCluster(); + int clientPort = this.zooKeeperCluster.startup(testDir); + conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); + + // start the mini cluster + this.cluster = new MiniBSPCluster(conf, groomServers); + } + + @Override + protected void setUp() throws Exception { + try { + if (this.startDfs) { + // This spews a bunch of warnings about missing scheme. TODO: fix. + this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true, + null, null, null, null); + + // mangle the conf so that the fs parameter points to the minidfs we + // just started up + FileSystem filesystem = dfsCluster.getFileSystem(); + conf.set("fs.defaultFS", filesystem.getUri().toString()); + Path parentdir = filesystem.getHomeDirectory(); + + //conf.set(HConstants.HBASE_DIR, parentdir.toString()); + filesystem.mkdirs(parentdir); + //FSUtils.setVersion(filesystem, parentdir); + } + + // do the super setup now. if we had done it first, then we would have + // gotten our conf all mangled and a local fs started up. + super.setUp(); + + // start the instance + hamaClusterSetup(); + + } catch (Exception e) { + LOG.error("Exception in setup!", e); + if (cluster != null) { + cluster.shutdown(); + } + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + } + if (dfsCluster != null) { + shutdownDfs(dfsCluster); + } + throw e; + } + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + try { + if (this.cluster != null) { + try { + this.cluster.shutdown(); + } catch (Exception e) { + LOG.warn("Closing mini dfs", e); + } + try { + this.zooKeeperCluster.shutdown(); + } catch (IOException e) { + LOG.warn("Shutting down ZooKeeper cluster", e); + } + } + if (startDfs) { + shutdownDfs(dfsCluster); + } + } catch (Exception e) { + LOG.error(e); + } + } + + + /** + * Use this utility method debugging why cluster won't go down. On a + * period it throws a thread dump. Method ends when all cluster + * regionservers and master threads are no long alive. + */ + public void threadDumpingJoin() { + if (this.cluster.getGroomServerThreads() != null) { + for(Thread t: this.cluster.getGroomServerThreads()) { + threadDumpingJoin(t); + } + } + threadDumpingJoin(this.cluster.getMaster()); + } + + protected void threadDumpingJoin(final Thread t) { + if (t == null) { + return; + } + long startTime = System.currentTimeMillis(); + while (t.isAlive()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Continuing...", e); + } + if (System.currentTimeMillis() - startTime > 60000) { + startTime = System.currentTimeMillis(); + ReflectionUtils.printThreadInfo(new PrintWriter(System.out), + "Automatic Stack Trace every 60 seconds waiting on " + + t.getName()); + } + } + } +} Index: src/test/org/apache/hama/HamaTestCase.java =================================================================== --- src/test/org/apache/hama/HamaTestCase.java (리비전 0) +++ src/test/org/apache/hama/HamaTestCase.java (리비전 0) @@ -0,0 +1,159 @@ +package org.apache.hama; + +import java.io.File; +import java.io.IOException; + +import junit.framework.AssertionFailedError; +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hama.util.Bytes; + +public abstract class HamaTestCase extends TestCase { + private static Log LOG = LogFactory.getLog(HamaTestCase.class); + + /** configuration parameter name for test directory */ + public static final String TEST_DIRECTORY_KEY = "test.build.data"; + + private boolean localfs = false; + protected Path testDir = null; + protected FileSystem fs = null; + + static { + initialize(); + } + + public volatile HamaConfiguration conf; + + /** constructor */ + public HamaTestCase() { + super(); + init(); + } + + /** + * @param name + */ + public HamaTestCase(String name) { + super(name); + init(); + } + + private void init() { + conf = new HamaConfiguration(); + } + + /** + * Note that this method must be called after the mini hdfs cluster has + * started or we end up with a local file system. + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + localfs = + (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0); + + if (fs == null) { + this.fs = FileSystem.get(conf); + } + try { + if (localfs) { + this.testDir = getUnitTestdir(getName()); + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } else { + this.testDir = + this.fs.makeQualified(new Path("/tmp/hama-test")); + } + } catch (Exception e) { + LOG.fatal("error during setup", e); + throw e; + } + } + + @Override + protected void tearDown() throws Exception { + try { + if (localfs) { + if (this.fs.exists(testDir)) { + this.fs.delete(testDir, true); + } + } + } catch (Exception e) { + LOG.fatal("error during tear down", e); + } + super.tearDown(); + } + + protected Path getUnitTestdir(String testName) { + return new Path( + conf.get(TEST_DIRECTORY_KEY, "test/build/data"), testName); + } + + /** + * Initializes parameters used in the test environment: + * + * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set. + * Sets the boolean debugging if "DEBUGGING" is set in the environment. + * If debugging is enabled, reconfigures logging so that the root log level is + * set to WARN and the logging level for the package is set to DEBUG. + */ + public static void initialize() { + if (System.getProperty(TEST_DIRECTORY_KEY) == null) { + System.setProperty(TEST_DIRECTORY_KEY, new File( + "build/hama/test").getAbsolutePath()); + } + } + + /** + * Common method to close down a MiniDFSCluster and the associated file system + * + * @param cluster + */ + public static void shutdownDfs(MiniDFSCluster cluster) { + if (cluster != null) { + LOG.info("Shutting down Mini DFS "); + try { + cluster.shutdown(); + } catch (Exception e) { + /// Can get a java.lang.reflect.UndeclaredThrowableException thrown + // here because of an InterruptedException. Don't let exceptions in + // here be cause of test failure. + } + try { + FileSystem fs = cluster.getFileSystem(); + if (fs != null) { + LOG.info("Shutting down FileSystem"); + fs.close(); + } + FileSystem.closeAll(); + } catch (IOException e) { + LOG.error("error closing file system", e); + } + } + } + + public void assertByteEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toString(expected) + "> but was:<" + + Bytes.toString(actual) + ">"); + } + } + + public static void assertEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toStringBinary(expected) + "> but was:<" + + Bytes.toStringBinary(actual) + ">"); + } + } + +} Index: src/test/org/apache/hama/MiniBSPCluster.java =================================================================== --- src/test/org/apache/hama/MiniBSPCluster.java (리비전 1021565) +++ src/test/org/apache/hama/MiniBSPCluster.java (작업 사본) @@ -1,5 +1,28 @@ -package org.apache.hama.bsp; +package org.apache.hama; +import java.util.List; + +import org.apache.hama.HamaConfiguration; + public class MiniBSPCluster { + public MiniBSPCluster(HamaConfiguration conf, int groomServers) { + // TODO Auto-generated constructor stub + } + + public void shutdown() { + // TODO Auto-generated method stub + + } + + public List getGroomServerThreads() { + // TODO Auto-generated method stub + return null; + } + + public Thread getMaster() { + // TODO Auto-generated method stub + return null; + } + } Index: src/test/org/apache/hama/bsp/BSPPeerTest.java =================================================================== --- src/test/org/apache/hama/bsp/BSPPeerTest.java (리비전 1021594) +++ src/test/org/apache/hama/bsp/BSPPeerTest.java (작업 사본) @@ -1,210 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.bsp; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hama.Constants; -import org.apache.hama.HamaCluster; -import org.apache.hama.util.Bytes; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -public class BSPPeerTest extends HamaCluster implements Watcher { - private Log LOG = LogFactory.getLog(BSPPeerTest.class); - - private static final int NUM_PEER = 35; - private static final int ROUND = 3; - private static final int PAYLOAD = 1024; // 1kb in default - List list = new ArrayList(NUM_PEER); - Configuration conf; - private Random r = new Random(); - - public BSPPeerTest() { - this.conf = getConf(); - } - - public void setUp() throws Exception { - super.setUp(); - - ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); - Stat s = null; - if (zk != null) { - try { - s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); - } catch (Exception e) { - LOG.error(s); - } - - if (s == null) { - try { - zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException e) { - LOG.error(e); - } catch (InterruptedException e) { - LOG.error(e); - } - } - } - } - - public class BSPPeerThread extends Thread { - private BSPPeer peer; - private int MAXIMUM_DURATION = 5; - - public BSPPeerThread(Configuration conf) throws IOException { - this.peer = new BSPPeer(conf); - } - - @Override - public void run() { - int randomTime; - byte[] dummyData = new byte[PAYLOAD]; - BSPMessage msg = null; - InetSocketAddress addr = null; - - for (int i = 0; i < ROUND; i++) { - randomTime = r.nextInt(MAXIMUM_DURATION) + 5; - - for (int j = 0; j < 10; j++) { - r.nextBytes(dummyData); - msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData); - - addr = new InetSocketAddress("localhost", 30000 + j); - try { - peer.send(addr, msg); - } catch (IOException e) { - LOG.info(e); - } - } - - try { - Thread.sleep(randomTime * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - try { - peer.sync(); - } catch (IOException e) { - e.printStackTrace(); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - verifyPayload(); - } - } - - private void verifyPayload() { - System.out.println("[" + getName() + "] verifying " - + peer.localQueue.size() + " messages"); - BSPMessage msg = null; - - try { - while ((msg = peer.getCurrentMessage()) != null) { - assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data, - msg.data.length - 128, 128), 0); - } - } catch (IOException e) { - LOG.error(e); - } - - peer.localQueue.clear(); - } - - public BSPPeer getBSPPeer() { - return this.peer; - } - } - - public void testSync() throws InterruptedException, IOException { - - BSPPeerThread thread; - for (int i = 0; i < NUM_PEER; i++) { - conf.setInt("bsp.peers.num", NUM_PEER); - conf.set(Constants.PEER_HOST, "localhost"); - conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); - conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); - thread = new BSPPeerThread(conf); - list.add(thread); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).start(); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).join(); - } - } - - /* - * Test method for constructors - */ - public void testBSPPeer() throws IOException { - Configuration conf = new Configuration(); - BSPPeer peer = new BSPPeer(conf); - - System.out.println(peer.bindAddress+" = "+Constants.DEFAULT_PEER_HOST); - System.out.println(peer.bindPort+" = "+Constants.DEFAULT_PEER_PORT); - assertEquals(peer.bindAddress,Constants.DEFAULT_PEER_HOST); - assertEquals(peer.bindPort,Constants.DEFAULT_PEER_PORT); - assertEquals(peer.zookeeperAddr,Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR); - - int peerPort; - int zkPort; - conf = new Configuration(); - conf.set(Constants.PEER_HOST, "localhost"); - do{ - peerPort = r.nextInt(Short.MAX_VALUE); - } while(peerPort == 0); - conf.setInt(Constants.PEER_PORT, peerPort); - - do{ - zkPort = r.nextInt(Short.MAX_VALUE); - } while(zkPort == peerPort || zkPort == 0); - conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort); - peer = new BSPPeer(conf); - assertEquals(peer.bindAddress,"localhost"); - assertEquals(peer.bindPort,peerPort); - assertEquals(peer.zookeeperAddr,"localhost:"+zkPort); - } - - @Override - public void process(WatchedEvent event) { - } -} Index: src/test/org/apache/hama/bsp/BSPTestDriver.java =================================================================== --- src/test/org/apache/hama/bsp/BSPTestDriver.java (리비전 1021594) +++ src/test/org/apache/hama/bsp/BSPTestDriver.java (작업 사본) @@ -1,25 +0,0 @@ -package org.apache.hama.bsp; - -import java.io.IOException; - -import org.apache.hama.HamaConfiguration; - -public class BSPTestDriver { - - /** - * @param args - * @throws IOException - * @throws InterruptedException - */ - public static void main(String[] args) throws IOException, InterruptedException { - BSPJob job = new BSPJob(new HamaConfiguration()); - job.setJarByClass(BSP.class); - job.setBspClass(BSP.class); - job.submit(); - Thread.sleep(3000); - - System.out.println("job id:"+job.getJobID()); - System.out.println("job Name:"+job.getJobName()); - System.out.println("working dir:"+job.getWorkingDirectory()); - } -} Index: src/test/org/apache/hama/bsp/MiniBSPCluster.java =================================================================== --- src/test/org/apache/hama/bsp/MiniBSPCluster.java (리비전 1021594) +++ src/test/org/apache/hama/bsp/MiniBSPCluster.java (작업 사본) @@ -1,5 +0,0 @@ -package org.apache.hama.bsp; - -public class MiniBSPCluster { - -} Index: src/test/org/apache/hama/bsp/SerializePrinting.java =================================================================== --- src/test/org/apache/hama/bsp/SerializePrinting.java (리비전 1021594) +++ src/test/org/apache/hama/bsp/SerializePrinting.java (작업 사본) @@ -1,120 +0,0 @@ -package org.apache.hama.bsp; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hama.Constants; -import org.apache.hama.HamaCluster; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -/** - * Serialize Printing of Hello World - */ -public class SerializePrinting extends HamaCluster implements Watcher { - private Log LOG = LogFactory.getLog(SerializePrinting.class); - private int NUM_PEER = 10; - List list = new ArrayList(NUM_PEER); - List echo = new ArrayList(); - Configuration conf; - - public SerializePrinting() { - this.conf = getConf(); - } - - public void setUp() throws Exception { - super.setUp(); - - ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); - Stat s = null; - if (zk != null) { - try { - s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); - } catch (Exception e) { - LOG.error(s); - } - - if (s == null) { - try { - zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException e) { - LOG.error(e); - } catch (InterruptedException e) { - LOG.error(e); - } - } - } - } - - public void testHelloWorld() throws InterruptedException, IOException { - BSPPeerThread thread; - int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 }; - for (int i = 0; i < NUM_PEER; i++) { - conf.setInt("bsp.peers.num", NUM_PEER); - conf.set(Constants.PEER_HOST, "localhost"); - conf.set(Constants.PEER_PORT, String - .valueOf(30000 + randomSequence[i])); - conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); - thread = new BSPPeerThread(conf, randomSequence[i]); - System.out.println(randomSequence[i] + ", " + thread.getName()); - list.add(thread); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).start(); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).join(); - } - } - - public class BSPPeerThread extends Thread { - private BSPPeer peer; - private int myId; - - public BSPPeerThread(Configuration conf, int myId) throws IOException { - this.peer = new BSPPeer(conf); - this.myId = myId; - } - - @Override - public void run() { - for (int i = 0; i < NUM_PEER; i++) { - if (myId == i) { - echo.add(getName()); - System.out.println("Hello BSP from " + i + " of " + NUM_PEER + ": " - + getName()); - } - - try { - Thread.sleep(2000); - peer.sync(); - } catch (IOException e) { - e.printStackTrace(); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - } - } - } - - @Override - public void process(WatchedEvent event) { - // TODO Auto-generated method stub - - } -} Index: src/test/org/apache/hama/bsp/TestBSPPeer.java =================================================================== --- src/test/org/apache/hama/bsp/TestBSPPeer.java (리비전 0) +++ src/test/org/apache/hama/bsp/TestBSPPeer.java (리비전 0) @@ -0,0 +1,179 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; +import org.apache.hama.util.Bytes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +public class TestBSPPeer extends HamaCluster implements Watcher { + private Log LOG = LogFactory.getLog(TestBSPPeer.class); + + private static final int NUM_PEER = 35; + private static final int ROUND = 3; + private static final int PAYLOAD = 1024; // 1kb in default + List list = new ArrayList(NUM_PEER); + Configuration conf; + private Random r = new Random(); + + public TestBSPPeer() { + this.conf = getConf(); + } + + public void setUp() throws Exception { + super.setUp(); + + ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); + Stat s = null; + if (zk != null) { + try { + s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); + } catch (Exception e) { + LOG.error(s); + } + + if (s == null) { + try { + zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + } + + public class BSPPeerThread extends Thread { + private BSPPeer peer; + private int MAXIMUM_DURATION = 5; + + public BSPPeerThread(Configuration conf) throws IOException { + this.peer = new BSPPeer(conf); + } + + @Override + public void run() { + int randomTime; + byte[] dummyData = new byte[PAYLOAD]; + BSPMessage msg = null; + InetSocketAddress addr = null; + + for (int i = 0; i < ROUND; i++) { + randomTime = r.nextInt(MAXIMUM_DURATION) + 5; + + for (int j = 0; j < 10; j++) { + r.nextBytes(dummyData); + msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData); + addr = new InetSocketAddress("localhost", 30000 + j); + try { + peer.send(addr, msg); + } catch (IOException e) { + LOG.info(e); + } + } + + try { + Thread.sleep(randomTime * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + peer.sync(); + } catch (IOException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + verifyPayload(); + } + } + + private void verifyPayload() { + System.out.println("[" + getName() + "] verifying " + + peer.localQueue.size() + " messages"); + BSPMessage msg = null; + + try { + while ((msg = peer.getCurrentMessage()) != null) { + assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data, + msg.data.length - 128, 128), 0); + } + } catch (IOException e) { + LOG.error(e); + } + + peer.localQueue.clear(); + } + + public BSPPeer getBSPPeer() { + return this.peer; + } + } + + public void testSync() throws InterruptedException, IOException { + + BSPPeerThread thread; + conf.setInt("bsp.peers.num", NUM_PEER); + conf.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + + for (int i = 0; i < NUM_PEER; i++) { + conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); + thread = new BSPPeerThread(conf); + list.add(thread); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).start(); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).join(); + } + } + + @Override + public void process(WatchedEvent event) { + } +} Index: src/test/org/apache/hama/bsp/TestClusterStatus.java =================================================================== --- src/test/org/apache/hama/bsp/TestClusterStatus.java (리비전 1021565) +++ src/test/org/apache/hama/bsp/TestClusterStatus.java (작업 사본) @@ -1,56 +0,0 @@ -package org.apache.hama.bsp; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; - -import junit.framework.TestCase; - -public class TestClusterStatus extends TestCase { - Random rnd = new Random(); - - protected void setUp() throws Exception { - super.setUp(); - } - - public final void testWriteAndReadFields() throws IOException { - DataOutputBuffer out = new DataOutputBuffer(); - DataInputBuffer in = new DataInputBuffer(); - - ClusterStatus status1; - List grooms = new ArrayList(); - - for(int i=0;i< 10;i++) { - grooms.add("groom_"+rnd.nextInt()); - } - - int tasks = rnd.nextInt(100); - int maxTasks = rnd.nextInt(100); - BSPMaster.State state = BSPMaster.State.RUNNING; - - status1 = new ClusterStatus(grooms, tasks, maxTasks, state); - status1.write(out); - - in.reset(out.getData(), out.getLength()); - - ClusterStatus status2 = new ClusterStatus(); - status2.readFields(in); - - Set grooms_s = new HashSet(status1.getActiveGroomNames()); - Set grooms_o = new HashSet(status2.getActiveGroomNames()); - - assertEquals(status1.getGroomServers(), status2.getGroomServers()); - - assertTrue(grooms_s.containsAll(grooms_o)); - assertTrue(grooms_o.containsAll(grooms_s)); - - assertEquals(status1.getTasks(),status2.getTasks()); - assertEquals(status1.getMaxTasks(), status2.getMaxTasks()); - } -} Index: src/test/org/apache/hama/bsp/TestClusterStatus.java =================================================================== --- src/test/org/apache/hama/bsp/TestClusterStatus.java (리비전 0) +++ src/test/org/apache/hama/bsp/TestClusterStatus.java (리비전 1021565) @@ -0,0 +1,58 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hama.bsp.BSPMaster; +import org.apache.hama.bsp.ClusterStatus; + +public class TestClusterStatus extends TestCase { + Random rnd = new Random(); + + protected void setUp() throws Exception { + super.setUp(); + } + + public final void testWriteAndReadFields() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); + DataInputBuffer in = new DataInputBuffer(); + + ClusterStatus status1; + List grooms = new ArrayList(); + + for(int i=0;i< 10;i++) { + grooms.add("groom_"+rnd.nextInt()); + } + + int tasks = rnd.nextInt(100); + int maxTasks = rnd.nextInt(100); + BSPMaster.State state = BSPMaster.State.RUNNING; + + status1 = new ClusterStatus(grooms, tasks, maxTasks, state); + status1.write(out); + + in.reset(out.getData(), out.getLength()); + + ClusterStatus status2 = new ClusterStatus(); + status2.readFields(in); + + Set grooms_s = new HashSet(status1.getActiveGroomNames()); + Set grooms_o = new HashSet(status2.getActiveGroomNames()); + + assertEquals(status1.getGroomServers(), status2.getGroomServers()); + + assertTrue(grooms_s.containsAll(grooms_o)); + assertTrue(grooms_o.containsAll(grooms_s)); + + assertEquals(status1.getTasks(),status2.getTasks()); + assertEquals(status1.getMaxTasks(), status2.getMaxTasks()); + } +} Index: src/test/org/apache/hama/bsp/TestSerializePrinting.java =================================================================== --- src/test/org/apache/hama/bsp/TestSerializePrinting.java (리비전 0) +++ src/test/org/apache/hama/bsp/TestSerializePrinting.java (리비전 0) @@ -0,0 +1,122 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +/** + * Serialize Printing of Hello World + */ +public class TestSerializePrinting extends HamaCluster implements Watcher { + private Log LOG = LogFactory.getLog(TestSerializePrinting.class); + private int NUM_PEER = 10; + List list = new ArrayList(NUM_PEER); + List echo = new ArrayList(); + Configuration conf; + + public TestSerializePrinting() { + this.conf = getConf(); + } + + public void setUp() throws Exception { + super.setUp(); + + ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); + Stat s = null; + if (zk != null) { + try { + s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); + } catch (Exception e) { + LOG.error(s); + } + + if (s == null) { + try { + zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + } + + public void testHelloWorld() throws InterruptedException, IOException { + BSPPeerThread thread; + int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 }; + for (int i = 0; i < NUM_PEER; i++) { + conf.setInt("bsp.peers.num", NUM_PEER); + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_PORT, String + .valueOf(30000 + randomSequence[i])); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + thread = new BSPPeerThread(conf, randomSequence[i]); + System.out.println(randomSequence[i] + ", " + thread.getName()); + list.add(thread); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).start(); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).join(); + } + } + + public class BSPPeerThread extends Thread { + private BSPPeer peer; + private int myId; + + public BSPPeerThread(Configuration conf, int myId) throws IOException { + conf.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + + this.peer = new BSPPeer(conf); + this.myId = myId; + } + + @Override + public void run() { + for (int i = 0; i < NUM_PEER; i++) { + if (myId == i) { + echo.add(getName()); + System.out.println("Hello BSP from " + i + " of " + NUM_PEER + ": " + + getName()); + } + + try { + Thread.sleep(2000); + peer.sync(); + } catch (IOException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + } + + @Override + public void process(WatchedEvent event) { + // TODO Auto-generated method stub + + } +}