diff --git hcatalog/webhcat/svr/pom.xml hcatalog/webhcat/svr/pom.xml index 8c94717..4effc84 100644 --- hcatalog/webhcat/svr/pom.xml +++ hcatalog/webhcat/svr/pom.xml @@ -116,6 +116,11 @@ hadoop-core ${hadoop-20S.version} + + org.apache.hbase + hbase-client + ${hbase.hadoop1.version} + @@ -141,6 +146,11 @@ hadoop-mapreduce-client-core ${hadoop-23.version} + + org.apache.hbase + hbase-client + ${hbase.hadoop2.version} + @@ -195,6 +205,61 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + include-hbase-zklib + + package + + shade + + + true + + + org.apache.hbase + org.cloudera.htrace + + + + + org.apache.hadoop.hbase + webhcat.org.apache.hadoop.hbase + + + org.cloudera.htrace + webhcat.org.cloudera.htrace + + + + + org.apache.hbase + + META-INF/** + *.xml + + + + org.cloudera.htrace + + META-INF/** + + + + + + + diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java index 36b64da..7f8d313 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java @@ -36,18 +36,18 @@ private static final Log LOG = LogFactory.getLog(JobState.class); - private String id; + private final String jobID; // Storage is instantiated in the constructor - private TempletonStorage storage = null; + private final TempletonStorage storage ; - private static TempletonStorage.Type type = TempletonStorage.Type.JOB; + private static final TempletonStorage.Type type = TempletonStorage.Type.JOB; - private Configuration config = null; + private final Configuration config; - public JobState(String id, Configuration conf) + public JobState(String jobID, Configuration conf) throws IOException { - this.id = id; + this.jobID = jobID; config = conf; storage = getStorage(conf); } @@ -55,10 +55,10 @@ public JobState(String id, Configuration conf) public void delete() throws IOException { try { - storage.delete(type, id); + storage.delete(type, jobID); } catch (Exception e) { // Error getting children of node -- probably node has been deleted - LOG.info("Couldn't delete " + id); + LOG.info("Couldn't delete " + jobID); } } @@ -87,7 +87,7 @@ public static TempletonStorage getStorageInstance(Configuration conf) { * Get an open instance of the selected storage class. Defaults * to HDFS storage if none is specified. */ - public static TempletonStorage getStorage(Configuration conf) throws IOException { + private static TempletonStorage getStorage(Configuration conf) throws IOException { TempletonStorage storage = getStorageInstance(conf); storage.openStorage(conf); return storage; @@ -109,7 +109,7 @@ public void close() throws IOException { * This job id. */ public String getId() { - return id; + return jobID; } /** @@ -276,7 +276,7 @@ public void setNotifiedTime(long notified) */ public Long getLongField(String name) throws IOException { - String s = storage.getField(type, id, name); + String s = storage.getField(type, jobID, name); if (s == null) return null; else { @@ -295,7 +295,7 @@ public Long getLongField(String name) public void setField(String name, String val) throws IOException { try { - storage.saveField(type, id, name, val); + storage.saveField(type, jobID, name, val); } catch (NotFoundException ne) { throw new IOException(ne.getMessage()); } @@ -303,7 +303,7 @@ public void setField(String name, String val) public String getField(String name) throws IOException { - return storage.getField(type, id, name); + return storage.getField(type, jobID, name); } /** @@ -316,9 +316,9 @@ public String getField(String name) public void setLongField(String name, long val) throws IOException { try { - storage.saveField(type, id, name, String.valueOf(val)); + storage.saveField(type, jobID, name, String.valueOf(val)); } catch (NotFoundException ne) { - throw new IOException("Job " + id + " was not found: " + + throw new IOException("Job " + jobID + " was not found: " + ne.getMessage()); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java index da3ad20..88f0564 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; @@ -33,10 +34,10 @@ public class JobStateTracker { // The path to the tracking root - private String job_trackingroot = null; + private final String job_trackingroot; // The zookeeper connection to use - private ZooKeeper zk; + private final RecoverableZooKeeper zk; // The id of the tracking node -- must be a SEQUENTIAL node private String trackingnode; @@ -51,7 +52,7 @@ * Constructor for a new node -- takes the jobid of an existing job * */ - public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker, + public JobStateTracker(String node, RecoverableZooKeeper zk, boolean nodeIsTracker, String job_trackingpath) { this.zk = zk; if (nodeIsTracker) { @@ -129,7 +130,7 @@ public String makeTrackingJobZnode(String nodename) { * Get the list of tracking jobs. These can be used to determine which jobs have * expired. */ - public static List getTrackingJobs(Configuration conf, ZooKeeper zk) + public static List getTrackingJobs(Configuration conf, RecoverableZooKeeper zk) throws IOException { ArrayList jobs = new ArrayList(); try { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java index 4f5e2d9..c62f93a 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java @@ -25,6 +25,7 @@ import java.util.Date; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.zookeeper.ZooKeeper; import org.apache.commons.logging.Log; @@ -89,7 +90,7 @@ public static void startInstance(Configuration appConf) throws IOException { * @throws IOException */ public void run() { - ZooKeeper zk = null; + RecoverableZooKeeper zk = null; List nodes = null; isRunning = true; while (!stop) { @@ -140,7 +141,7 @@ public void run() { * * @throws IOException */ - public List getChildList(ZooKeeper zk) { + public List getChildList(RecoverableZooKeeper zk) { try { List jobs = JobStateTracker.getTrackingJobs(appConf, zk); Collections.sort(jobs); @@ -154,7 +155,7 @@ public void run() { /** * Check to see if a job is more than maxage old, and delete it if so. */ - public boolean checkAndDelete(String node, ZooKeeper zk) { + public boolean checkAndDelete(String node, RecoverableZooKeeper zk) { JobState state = null; try { JobStateTracker tracker = new JobStateTracker(node, zk, true, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java index ecde598..0f962f3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java @@ -24,16 +24,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; /** * A storage implementation based on storing everything in ZooKeeper. @@ -56,33 +58,47 @@ public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout"; + //next 3 props are not meant to be documented and may be removed when + //RecoverableZooKeeper is replaced with Curator + private static final String ZK_RETRY_COUNT = "templeton.zookeeper.retrycount.int"; + private static final String ZK_RETRY_DELAY = "templeton.zookeeper.retrydelay.ms"; + private static final String ZK_CONNECTION_WAIT = "templeton.zookeeper.connwait.ms"; + public static final String ENCODING = "UTF-8"; private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class); - private ZooKeeper zk; + private RecoverableZooKeeper zk; /** * Open a ZooKeeper connection for the JobState. */ - public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout) + public static RecoverableZooKeeper zkOpen(Configuration conf) throws IOException { - return new ZooKeeper(zkHosts, - zkSessionTimeout, + String zkHosts = conf.get(ZK_HOSTS); + final CountDownLatch cdl = new CountDownLatch(1); + //according to JavaDoc, this may return before establishing connection + RecoverableZooKeeper zk = new RecoverableZooKeeper(zkHosts, + conf.getInt(ZK_SESSION_TIMEOUT, 30000), new Watcher() { @Override synchronized public void process(WatchedEvent event) { + if(event.getState() == Event.KeeperState.SyncConnected) { + cdl.countDown(); + } } - }); - } - - /** - * Open a ZooKeeper connection for the JobState. - */ - public static ZooKeeper zkOpen(Configuration conf) - throws IOException { - return zkOpen(conf.get(ZK_HOSTS), - conf.getInt(ZK_SESSION_TIMEOUT, 30000)); + }, + conf.getInt(ZK_RETRY_COUNT, 3), + conf.getInt(ZK_RETRY_DELAY, 1000)); + try { + if (cdl.await(conf.getInt(ZK_CONNECTION_WAIT, 1000), TimeUnit.MILLISECONDS)) { + return zk; + } + } + catch(InterruptedException ex) { + throw new IOException("Interrupted while waiting to connect to to ZooKeeper:" + zkHosts); + } + throw new IOException("Timed out waiting to establish ZooKeeper connection: " + zkHosts); } public ZooKeeperStorage() {