diff --git hcatalog/webhcat/svr/pom.xml hcatalog/webhcat/svr/pom.xml
index 8c94717..4effc84 100644
--- hcatalog/webhcat/svr/pom.xml
+++ hcatalog/webhcat/svr/pom.xml
@@ -116,6 +116,11 @@
hadoop-core
${hadoop-20S.version}
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.hadoop1.version}
+
@@ -141,6 +146,11 @@
hadoop-mapreduce-client-core
${hadoop-23.version}
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.hadoop2.version}
+
@@ -195,6 +205,61 @@
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ include-hbase-zklib
+
+ package
+
+ shade
+
+
+ true
+
+
+ org.apache.hbase
+ org.cloudera.htrace
+
+
+
+
+ org.apache.hadoop.hbase
+ webhcat.org.apache.hadoop.hbase
+
+
+ org.cloudera.htrace
+ webhcat.org.cloudera.htrace
+
+
+
+
+ org.apache.hbase
+
+ META-INF/**
+ *.xml
+
+
+
+ org.cloudera.htrace
+
+ META-INF/**
+
+
+
+
+
+
+
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
index 36b64da..7f8d313 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
@@ -36,18 +36,18 @@
private static final Log LOG = LogFactory.getLog(JobState.class);
- private String id;
+ private final String jobID;
// Storage is instantiated in the constructor
- private TempletonStorage storage = null;
+ private final TempletonStorage storage ;
- private static TempletonStorage.Type type = TempletonStorage.Type.JOB;
+ private static final TempletonStorage.Type type = TempletonStorage.Type.JOB;
- private Configuration config = null;
+ private final Configuration config;
- public JobState(String id, Configuration conf)
+ public JobState(String jobID, Configuration conf)
throws IOException {
- this.id = id;
+ this.jobID = jobID;
config = conf;
storage = getStorage(conf);
}
@@ -55,10 +55,10 @@ public JobState(String id, Configuration conf)
public void delete()
throws IOException {
try {
- storage.delete(type, id);
+ storage.delete(type, jobID);
} catch (Exception e) {
// Error getting children of node -- probably node has been deleted
- LOG.info("Couldn't delete " + id);
+ LOG.info("Couldn't delete " + jobID);
}
}
@@ -87,7 +87,7 @@ public static TempletonStorage getStorageInstance(Configuration conf) {
* Get an open instance of the selected storage class. Defaults
* to HDFS storage if none is specified.
*/
- public static TempletonStorage getStorage(Configuration conf) throws IOException {
+ private static TempletonStorage getStorage(Configuration conf) throws IOException {
TempletonStorage storage = getStorageInstance(conf);
storage.openStorage(conf);
return storage;
@@ -109,7 +109,7 @@ public void close() throws IOException {
* This job id.
*/
public String getId() {
- return id;
+ return jobID;
}
/**
@@ -276,7 +276,7 @@ public void setNotifiedTime(long notified)
*/
public Long getLongField(String name)
throws IOException {
- String s = storage.getField(type, id, name);
+ String s = storage.getField(type, jobID, name);
if (s == null)
return null;
else {
@@ -295,7 +295,7 @@ public Long getLongField(String name)
public void setField(String name, String val)
throws IOException {
try {
- storage.saveField(type, id, name, val);
+ storage.saveField(type, jobID, name, val);
} catch (NotFoundException ne) {
throw new IOException(ne.getMessage());
}
@@ -303,7 +303,7 @@ public void setField(String name, String val)
public String getField(String name)
throws IOException {
- return storage.getField(type, id, name);
+ return storage.getField(type, jobID, name);
}
/**
@@ -316,9 +316,9 @@ public String getField(String name)
public void setLongField(String name, long val)
throws IOException {
try {
- storage.saveField(type, id, name, String.valueOf(val));
+ storage.saveField(type, jobID, name, String.valueOf(val));
} catch (NotFoundException ne) {
- throw new IOException("Job " + id + " was not found: " +
+ throw new IOException("Job " + jobID + " was not found: " +
ne.getMessage());
}
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
index da3ad20..88f0564 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -33,10 +34,10 @@
public class JobStateTracker {
// The path to the tracking root
- private String job_trackingroot = null;
+ private final String job_trackingroot;
// The zookeeper connection to use
- private ZooKeeper zk;
+ private final RecoverableZooKeeper zk;
// The id of the tracking node -- must be a SEQUENTIAL node
private String trackingnode;
@@ -51,7 +52,7 @@
* Constructor for a new node -- takes the jobid of an existing job
*
*/
- public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+ public JobStateTracker(String node, RecoverableZooKeeper zk, boolean nodeIsTracker,
String job_trackingpath) {
this.zk = zk;
if (nodeIsTracker) {
@@ -129,7 +130,7 @@ public String makeTrackingJobZnode(String nodename) {
* Get the list of tracking jobs. These can be used to determine which jobs have
* expired.
*/
- public static List getTrackingJobs(Configuration conf, ZooKeeper zk)
+ public static List getTrackingJobs(Configuration conf, RecoverableZooKeeper zk)
throws IOException {
ArrayList jobs = new ArrayList();
try {
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
index 4f5e2d9..c62f93a 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
@@ -25,6 +25,7 @@
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.apache.commons.logging.Log;
@@ -89,7 +90,7 @@ public static void startInstance(Configuration appConf) throws IOException {
* @throws IOException
*/
public void run() {
- ZooKeeper zk = null;
+ RecoverableZooKeeper zk = null;
List nodes = null;
isRunning = true;
while (!stop) {
@@ -140,7 +141,7 @@ public void run() {
*
* @throws IOException
*/
- public List getChildList(ZooKeeper zk) {
+ public List getChildList(RecoverableZooKeeper zk) {
try {
List jobs = JobStateTracker.getTrackingJobs(appConf, zk);
Collections.sort(jobs);
@@ -154,7 +155,7 @@ public void run() {
/**
* Check to see if a job is more than maxage old, and delete it if so.
*/
- public boolean checkAndDelete(String node, ZooKeeper zk) {
+ public boolean checkAndDelete(String node, RecoverableZooKeeper zk) {
JobState state = null;
try {
JobStateTracker tracker = new JobStateTracker(node, zk, true,
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
index ecde598..0f962f3 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
@@ -24,16 +24,18 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
/**
* A storage implementation based on storing everything in ZooKeeper.
@@ -56,33 +58,47 @@
public static final String ZK_SESSION_TIMEOUT
= "templeton.zookeeper.session-timeout";
+ //next 3 props are not meant to be documented and may be removed when
+ //RecoverableZooKeeper is replaced with Curator
+ private static final String ZK_RETRY_COUNT = "templeton.zookeeper.retrycount.int";
+ private static final String ZK_RETRY_DELAY = "templeton.zookeeper.retrydelay.ms";
+ private static final String ZK_CONNECTION_WAIT = "templeton.zookeeper.connwait.ms";
+
public static final String ENCODING = "UTF-8";
private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
- private ZooKeeper zk;
+ private RecoverableZooKeeper zk;
/**
* Open a ZooKeeper connection for the JobState.
*/
- public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+ public static RecoverableZooKeeper zkOpen(Configuration conf)
throws IOException {
- return new ZooKeeper(zkHosts,
- zkSessionTimeout,
+ String zkHosts = conf.get(ZK_HOSTS);
+ final CountDownLatch cdl = new CountDownLatch(1);
+ //according to JavaDoc, this may return before establishing connection
+ RecoverableZooKeeper zk = new RecoverableZooKeeper(zkHosts,
+ conf.getInt(ZK_SESSION_TIMEOUT, 30000),
new Watcher() {
@Override
synchronized public void process(WatchedEvent event) {
+ if(event.getState() == Event.KeeperState.SyncConnected) {
+ cdl.countDown();
+ }
}
- });
- }
-
- /**
- * Open a ZooKeeper connection for the JobState.
- */
- public static ZooKeeper zkOpen(Configuration conf)
- throws IOException {
- return zkOpen(conf.get(ZK_HOSTS),
- conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+ },
+ conf.getInt(ZK_RETRY_COUNT, 3),
+ conf.getInt(ZK_RETRY_DELAY, 1000));
+ try {
+ if (cdl.await(conf.getInt(ZK_CONNECTION_WAIT, 1000), TimeUnit.MILLISECONDS)) {
+ return zk;
+ }
+ }
+ catch(InterruptedException ex) {
+ throw new IOException("Interrupted while waiting to connect to to ZooKeeper:" + zkHosts);
+ }
+ throw new IOException("Timed out waiting to establish ZooKeeper connection: " + zkHosts);
}
public ZooKeeperStorage() {