Index: pom.xml =================================================================== --- pom.xml (revision 1437242) +++ pom.xml (working copy) @@ -91,7 +91,7 @@ 1.7 2.6 3.0.1 - 1.0.4 + 2.0.0-alpha 6.1.14 4.8.1 1.5.8 Index: conf/hama-env.sh =================================================================== --- conf/hama-env.sh (revision 1437242) +++ conf/hama-env.sh (working copy) @@ -22,7 +22,7 @@ # Set environment variables here. # The java implementation to use. Required. -#export JAVA_HOME=/usr/lib/jvm/java-6-sun +export JAVA_HOME=/usr/lib/jvm/jdk1.7.0 # Where log files are stored. $HAMA_HOME/logs by default. # export HAMA_LOG_DIR=${HAMA_HOME}/logs Index: conf/hama-site.xml =================================================================== --- conf/hama-site.xml (revision 1437242) +++ conf/hama-site.xml (working copy) @@ -22,4 +22,32 @@ */ --> + + bsp.master.address + edward-virtualBox:40000 + The address of the bsp master server. Either the + literal string "local" or a host:port for distributed mode + + + + + fs.default.name + hdfs://edward-virtualBox:9000/ + + The name of the default file system. Either the literal string + "local" or a host:port for HDFS. + + + + + hama.zookeeper.quorum + edward-virtualBox + Comma separated list of servers in the ZooKeeper Quorum. + For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". + By default this is set to localhost for local and pseudo-distributed modes + of operation. For a fully-distributed setup, this should be set to a full + list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh + this is the list of servers which we will start/stop zookeeper on. + + Index: core/pom.xml =================================================================== --- core/pom.xml (revision 1437242) +++ core/pom.xml (working copy) @@ -100,15 +100,42 @@ org.mortbay.jetty jsp-2.1 + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + org.apache.hadoop - hadoop-core + hadoop-hdfs + ${hadoop.version} org.apache.hadoop - hadoop-test + hadoop-hdfs + tests + ${hadoop.version} + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + org.apache.hadoop + hadoop-auth + ${hadoop.version} + + + com.google.protobuf + protobuf-java + 2.4.0a + + + + org.apache.avro avro Index: core/src/main/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMaster.java (revision 1437242) +++ core/src/main/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -931,4 +932,11 @@ } return null; } + + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } } Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1437242) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -274,7 +274,7 @@ } } if (files.length() > 0) { - DistributedCache.addLocalFiles(conf, files.toString()); + //DistributedCache.addLocalFiles(conf, files.toString()); } } @@ -493,7 +493,7 @@ } } } - DistributedCache.setLocalFiles(conf, ""); + //DistributedCache.setLocalFiles(conf, ""); } public final void close() { Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1437242) +++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -364,7 +365,13 @@ "bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER), true, conf); - FileSystem local = FileSystem.getLocal(conf); + // FIXME Hadoop 2.0.x doesn't support getLocal() + FileSystem local = null; + try { + local = FileSystem.getLocal(conf); + } catch (UnsupportedOperationException e) { + LOG.info(e.toString()); + } server.setAttribute("groom.server", this); server.setAttribute("local.file.system", local); server.setAttribute("conf", conf); @@ -375,7 +382,7 @@ server.start(); @SuppressWarnings("deprecation") - String address = NetUtils.getServerAddress(conf, + String address = BSPNetUtils.getServerAddress(conf, "bsp.groom.report.bindAddress", "bsp.groom.report.port", "bsp.groom.report.address"); InetSocketAddress socAddr = NetUtils.createSocketAddr(address); @@ -487,9 +494,13 @@ } public void deleteLocalFiles() throws IOException { - String[] localDirs = getLocalDirs(); - for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true); + try { + String[] localDirs = getLocalDirs(); + for (int i = 0; i < localDirs.length; i++) { + FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true); + } + } catch (UnsupportedOperationException e) { + LOG.info("couldn't delete local files."); } } @@ -500,6 +511,8 @@ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir), true); } + } catch (UnsupportedOperationException e) { + LOG.info("couldn't delete local files."); } catch (NullPointerException e) { LOG.info(e); } @@ -811,10 +824,10 @@ // Task is out of contact if it has not pinged since more than // monitorPeriod. A task is given a leeway of 10 times monitorPeriod // to get started. - + // TODO Please refactor this conditions - // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod - + // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod + if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) { @@ -1380,4 +1393,11 @@ public void process(WatchedEvent event) { } + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } + } Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1437242) +++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; @@ -406,6 +407,13 @@ return 0; } + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, + int arg2) throws IOException { + // TODO Auto-generated method stub + return null; + } + } public static class LocalSyncClient extends BSPPeerSyncClient { @@ -523,4 +531,11 @@ int startFrom, int i) { return TaskCompletionEvent.EMPTY_ARRAY; } + + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } } Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1437242) +++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hama.bsp.BSPMessageBundle; @@ -157,4 +158,11 @@ return versionID; } + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } + } Index: core/src/main/java/org/apache/hama/util/BSPNetUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/BSPNetUtils.java (revision 1437242) +++ core/src/main/java/org/apache/hama/util/BSPNetUtils.java (working copy) @@ -17,17 +17,24 @@ */ package org.apache.hama.util; -import org.apache.hama.Constants; -import org.apache.mina.util.AvailablePortFinder; - import java.io.IOException; -import java.net.*; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; import java.util.NoSuchElementException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.Constants; + /** * NetUtils for our needs. */ public class BSPNetUtils { + private static final Log LOG = LogFactory.getLog(BSPNetUtils.class); public static final int MAX_PORT_NUMBER = 65535; /** @@ -57,17 +64,18 @@ */ public static int getFreePort(int pStartPort) { int startPort = pStartPort; - while (!AvailablePortFinder.available(startPort)) { + while (!available(startPort)) { startPort++; } return startPort; } - + /** * Gets a new InetSocketAddress from the given peerName. peerName must contain * a colon to distinct between host and port. * - * @param peerName the name as a String of the BSP peer to get the address from + * @param peerName the name as a String of the BSP peer to get the address + * from * @return the InetSocketAddress of the given BSP peer */ public static InetSocketAddress getAddress(String peerName) { @@ -87,7 +95,7 @@ * @param port the port to check for availability */ public static boolean available(int port) { - if (port < Constants.DEFAULT_PEER_PORT || port > MAX_PORT_NUMBER) { + if (port > MAX_PORT_NUMBER) { throw new IllegalArgumentException("Invalid start port: " + port); } @@ -132,4 +140,41 @@ throw new NoSuchElementException("Could not find an available port " + "above " + fromPort); } + + /** + * Handle the transition from pairs of attributes specifying a host and port + * to a single colon separated one. + * + * @param conf the configuration to check + * @param oldBindAddressName the old address attribute name + * @param oldPortName the old port attribute name + * @param newBindAddressName the new combined name + * @return the complete address from the configuration + */ + public static String getServerAddress(Configuration conf, + String oldBindAddressName, String oldPortName, String newBindAddressName) { + String oldAddr = conf.get(oldBindAddressName); + String oldPort = conf.get(oldPortName); + String newAddrPort = conf.get(newBindAddressName); + if (oldAddr == null && oldPort == null) { + return newAddrPort; + } + String[] newAddrPortParts = newAddrPort.split(":", 2); + if (newAddrPortParts.length != 2) { + throw new IllegalArgumentException("Invalid address/port: " + newAddrPort); + } + if (oldAddr == null) { + oldAddr = newAddrPortParts[0]; + } else { + LOG.warn("Configuration parameter " + oldBindAddressName + + " is deprecated. Use " + newBindAddressName + " instead."); + } + if (oldPort == null) { + oldPort = newAddrPortParts[1]; + } else { + LOG.warn("Configuration parameter " + oldPortName + + " is deprecated. Use " + newBindAddressName + " instead."); + } + return oldAddr + ":" + oldPort; + } } Index: core/src/main/java/org/apache/hama/util/RunJar.java =================================================================== --- core/src/main/java/org/apache/hama/util/RunJar.java (revision 1437242) +++ core/src/main/java/org/apache/hama/util/RunJar.java (working copy) @@ -114,10 +114,7 @@ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - try { - FileUtil.fullyDelete(workDir); - } catch (IOException e) { - } + FileUtil.fullyDelete(workDir); } }); Index: core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (revision 1437242) +++ core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (working copy) @@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hama.Constants; @@ -164,6 +165,13 @@ lastPingTime = 0L; } } + + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, + int arg2) throws IOException { + // TODO Auto-generated method stub + return null; + } } @SuppressWarnings("unused") Index: core/src/test/java/org/apache/hama/ipc/TestRPC.java =================================================================== --- core/src/test/java/org/apache/hama/ipc/TestRPC.java (revision 1437242) +++ core/src/test/java/org/apache/hama/ipc/TestRPC.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.VersionedProtocol; @@ -117,6 +118,13 @@ } } + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, + int arg2) throws IOException { + // TODO Auto-generated method stub + return null; + } + } public void testCalls() throws Exception {