Index: pom.xml
===================================================================
--- pom.xml (revision 1437242)
+++ pom.xml (working copy)
@@ -91,7 +91,7 @@
1.7
2.6
3.0.1
- 1.0.4
+ 2.0.0-alpha
6.1.14
4.8.1
1.5.8
Index: conf/hama-env.sh
===================================================================
--- conf/hama-env.sh (revision 1437242)
+++ conf/hama-env.sh (working copy)
@@ -22,7 +22,7 @@
# Set environment variables here.
# The java implementation to use. Required.
-#export JAVA_HOME=/usr/lib/jvm/java-6-sun
+export JAVA_HOME=/usr/lib/jvm/jdk1.7.0
# Where log files are stored. $HAMA_HOME/logs by default.
# export HAMA_LOG_DIR=${HAMA_HOME}/logs
Index: conf/hama-site.xml
===================================================================
--- conf/hama-site.xml (revision 1437242)
+++ conf/hama-site.xml (working copy)
@@ -22,4 +22,32 @@
*/
-->
+
+ bsp.master.address
+ edward-virtualBox:40000
+ The address of the bsp master server. Either the
+ literal string "local" or a host:port for distributed mode
+
+
+
+
+ fs.default.name
+ hdfs://edward-virtualBox:9000/
+
+ The name of the default file system. Either the literal string
+ "local" or a host:port for HDFS.
+
+
+
+
+ hama.zookeeper.quorum
+ edward-virtualBox
+ Comma separated list of servers in the ZooKeeper Quorum.
+ For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+ By default this is set to localhost for local and pseudo-distributed modes
+ of operation. For a fully-distributed setup, this should be set to a full
+ list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
+ this is the list of servers which we will start/stop zookeeper on.
+
+
Index: core/pom.xml
===================================================================
--- core/pom.xml (revision 1437242)
+++ core/pom.xml (working copy)
@@ -100,15 +100,42 @@
org.mortbay.jetty
jsp-2.1
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
org.apache.hadoop
- hadoop-core
+ hadoop-hdfs
+ ${hadoop.version}
org.apache.hadoop
- hadoop-test
+ hadoop-hdfs
+ tests
+ ${hadoop.version}
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-auth
+ ${hadoop.version}
+
+
+ com.google.protobuf
+ protobuf-java
+ 2.4.0a
+
+
+
+
org.apache.avro
avro
Index: core/src/main/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPMaster.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/bsp/BSPMaster.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@@ -931,4 +932,11 @@
}
return null;
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy)
@@ -274,7 +274,7 @@
}
}
if (files.length() > 0) {
- DistributedCache.addLocalFiles(conf, files.toString());
+ //DistributedCache.addLocalFiles(conf, files.toString());
}
}
@@ -493,7 +493,7 @@
}
}
}
- DistributedCache.setLocalFiles(conf, "");
+ //DistributedCache.setLocalFiles(conf, "");
}
public final void close() {
Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy)
@@ -48,6 +48,7 @@
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -364,7 +365,13 @@
"bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER),
true, conf);
- FileSystem local = FileSystem.getLocal(conf);
+ // FIXME Hadoop 2.0.x doesn't support getLocal()
+ FileSystem local = null;
+ try {
+ local = FileSystem.getLocal(conf);
+ } catch (UnsupportedOperationException e) {
+ LOG.info(e.toString());
+ }
server.setAttribute("groom.server", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
@@ -375,7 +382,7 @@
server.start();
@SuppressWarnings("deprecation")
- String address = NetUtils.getServerAddress(conf,
+ String address = BSPNetUtils.getServerAddress(conf,
"bsp.groom.report.bindAddress", "bsp.groom.report.port",
"bsp.groom.report.address");
InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
@@ -487,9 +494,13 @@
}
public void deleteLocalFiles() throws IOException {
- String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
+ try {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
+ }
+ } catch (UnsupportedOperationException e) {
+ LOG.info("couldn't delete local files.");
}
}
@@ -500,6 +511,8 @@
FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
true);
}
+ } catch (UnsupportedOperationException e) {
+ LOG.info("couldn't delete local files.");
} catch (NullPointerException e) {
LOG.info(e);
}
@@ -811,10 +824,10 @@
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
-
+
// TODO Please refactor this conditions
- // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
-
+ // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
+
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
&& (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
@@ -1380,4 +1393,11 @@
public void process(WatchedEvent event) {
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy)
@@ -35,6 +35,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
@@ -406,6 +407,13 @@
return 0;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1,
+ int arg2) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
public static class LocalSyncClient extends BSPPeerSyncClient {
@@ -523,4 +531,11 @@
int startFrom, int i) {
return TaskCompletionEvent.EMPTY_ARRAY;
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hama.bsp.BSPMessageBundle;
@@ -157,4 +158,11 @@
return versionID;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Index: core/src/main/java/org/apache/hama/util/BSPNetUtils.java
===================================================================
--- core/src/main/java/org/apache/hama/util/BSPNetUtils.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/util/BSPNetUtils.java (working copy)
@@ -17,17 +17,24 @@
*/
package org.apache.hama.util;
-import org.apache.hama.Constants;
-import org.apache.mina.util.AvailablePortFinder;
-
import java.io.IOException;
-import java.net.*;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
import java.util.NoSuchElementException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+
/**
* NetUtils for our needs.
*/
public class BSPNetUtils {
+ private static final Log LOG = LogFactory.getLog(BSPNetUtils.class);
public static final int MAX_PORT_NUMBER = 65535;
/**
@@ -57,17 +64,18 @@
*/
public static int getFreePort(int pStartPort) {
int startPort = pStartPort;
- while (!AvailablePortFinder.available(startPort)) {
+ while (!available(startPort)) {
startPort++;
}
return startPort;
}
-
+
/**
* Gets a new InetSocketAddress from the given peerName. peerName must contain
* a colon to distinct between host and port.
*
- * @param peerName the name as a String of the BSP peer to get the address from
+ * @param peerName the name as a String of the BSP peer to get the address
+ * from
* @return the InetSocketAddress of the given BSP peer
*/
public static InetSocketAddress getAddress(String peerName) {
@@ -87,7 +95,7 @@
* @param port the port to check for availability
*/
public static boolean available(int port) {
- if (port < Constants.DEFAULT_PEER_PORT || port > MAX_PORT_NUMBER) {
+ if (port > MAX_PORT_NUMBER) {
throw new IllegalArgumentException("Invalid start port: " + port);
}
@@ -132,4 +140,41 @@
throw new NoSuchElementException("Could not find an available port "
+ "above " + fromPort);
}
+
+ /**
+ * Handle the transition from pairs of attributes specifying a host and port
+ * to a single colon separated one.
+ *
+ * @param conf the configuration to check
+ * @param oldBindAddressName the old address attribute name
+ * @param oldPortName the old port attribute name
+ * @param newBindAddressName the new combined name
+ * @return the complete address from the configuration
+ */
+ public static String getServerAddress(Configuration conf,
+ String oldBindAddressName, String oldPortName, String newBindAddressName) {
+ String oldAddr = conf.get(oldBindAddressName);
+ String oldPort = conf.get(oldPortName);
+ String newAddrPort = conf.get(newBindAddressName);
+ if (oldAddr == null && oldPort == null) {
+ return newAddrPort;
+ }
+ String[] newAddrPortParts = newAddrPort.split(":", 2);
+ if (newAddrPortParts.length != 2) {
+ throw new IllegalArgumentException("Invalid address/port: " + newAddrPort);
+ }
+ if (oldAddr == null) {
+ oldAddr = newAddrPortParts[0];
+ } else {
+ LOG.warn("Configuration parameter " + oldBindAddressName
+ + " is deprecated. Use " + newBindAddressName + " instead.");
+ }
+ if (oldPort == null) {
+ oldPort = newAddrPortParts[1];
+ } else {
+ LOG.warn("Configuration parameter " + oldPortName
+ + " is deprecated. Use " + newBindAddressName + " instead.");
+ }
+ return oldAddr + ":" + oldPort;
+ }
}
Index: core/src/main/java/org/apache/hama/util/RunJar.java
===================================================================
--- core/src/main/java/org/apache/hama/util/RunJar.java (revision 1437242)
+++ core/src/main/java/org/apache/hama/util/RunJar.java (working copy)
@@ -114,10 +114,7 @@
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- try {
- FileUtil.fullyDelete(workDir);
- } catch (IOException e) {
- }
+ FileUtil.fullyDelete(workDir);
}
});
Index: core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
===================================================================
--- core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (revision 1437242)
+++ core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (working copy)
@@ -45,6 +45,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hama.Constants;
@@ -164,6 +165,13 @@
lastPingTime = 0L;
}
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1,
+ int arg2) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
@SuppressWarnings("unused")
Index: core/src/test/java/org/apache/hama/ipc/TestRPC.java
===================================================================
--- core/src/test/java/org/apache/hama/ipc/TestRPC.java (revision 1437242)
+++ core/src/test/java/org/apache/hama/ipc/TestRPC.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
@@ -117,6 +118,13 @@
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1,
+ int arg2) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
public void testCalls() throws Exception {