diff --git hcatalog/webhcat/svr/pom.xml hcatalog/webhcat/svr/pom.xml index 3c66bbe..6065748 100644 --- hcatalog/webhcat/svr/pom.xml +++ hcatalog/webhcat/svr/pom.xml @@ -38,7 +38,7 @@ - + org.apache.hive.hcatalog @@ -72,6 +72,14 @@ commons-exec ${commons-exec.version} + + + + org.apache.curator + curator-framework + ${curator.version} + + org.apache.zookeeper zookeeper @@ -195,6 +203,37 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + include-curator + + package + + shade + + + true + + + org.apache.curator + + + + + org.apache.curator + webhcat.org.apache.curator + + + + + + diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java index fb46b58..9c73a73 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java @@ -25,9 +25,7 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -116,32 +114,6 @@ public String getField(Type type, String id, String key) { } @Override - public Map getFields(Type type, String id) { - HashMap map = new HashMap(); - BufferedReader in = null; - Path p = new Path(getPath(type) + "/" + id); - try { - for (FileStatus status : fs.listStatus(p)) { - in = new BufferedReader(new InputStreamReader(fs.open(status.getPath()))); - String line = null; - String val = ""; - while ((line = in.readLine()) != null) { - if (!val.equals("")) { - val += "\n"; - } - val += line; - } - map.put(status.getPath().getName(), val); - } - } catch (IOException e) { - LOG.trace("Couldn't find " + p); - } finally { - close(in); - } - return map; - } - - @Override public boolean delete(Type type, String id) throws NotFoundException { Path p = new Path(getPath(type) + "/" + id); try { @@ -153,14 +125,6 @@ public boolean delete(Type type, String id) throws NotFoundException { return false; } - @Override - public List getAll() { - ArrayList allNodes = new ArrayList(); - for (Type type : Type.values()) { - allNodes.addAll(getAllForType(type)); - } - return allNodes; - } @Override public List getAllForType(Type type) { @@ -177,40 +141,6 @@ public boolean delete(Type type, String id) throws NotFoundException { } @Override - public List getAllForKey(String key, String value) { - ArrayList allNodes = new ArrayList(); - try { - for (Type type : Type.values()) { - allNodes.addAll(getAllForTypeAndKey(type, key, value)); - } - } catch (Exception e) { - LOG.trace("Couldn't find children for key " + key + ": " + - e.getMessage()); - } - return allNodes; - } - - @Override - public List getAllForTypeAndKey(Type type, String key, String value) { - ArrayList allNodes = new ArrayList(); - HashMap map = new HashMap(); - try { - for (FileStatus status : - fs.listStatus(new Path(getPath(type)))) { - map = (HashMap) - getFields(type, status.getPath().getName()); - if (map.get(key).equals(value)) { - allNodes.add(status.getPath().getName()); - } - } - } catch (Exception e) { - LOG.trace("Couldn't find children for key " + key + ": " + - e.getMessage()); - } - return allNodes; - } - - @Override public void openStorage(Configuration config) throws IOException { storage_root = config.get(TempletonStorage.STORAGE_ROOT); if (fs == null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java index da3ad20..41fd82f 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java @@ -24,19 +24,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; +/* + * The general idea here is to create + * /created/1 + * /created/2 + * /created/3 .... + * for each job submitted. The node number is generated by ZK (PERSISTENT_SEQUENTIAL) and the + * payload is the JobId. Basically this keeps track of the order in which jobs were submitted, + * and ZooKeeperCleanup uses this to purge old job info. + * Since the /jobs/ node has a create/update timestamp + * (http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#sc_zkStatStructure) this whole + * thing can be removed. +*/ public class JobStateTracker { // The path to the tracking root private String job_trackingroot = null; // The zookeeper connection to use - private ZooKeeper zk; + private CuratorFramework zk; // The id of the tracking node -- must be a SEQUENTIAL node private String trackingnode; @@ -51,7 +62,7 @@ * Constructor for a new node -- takes the jobid of an existing job * */ - public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker, + public JobStateTracker(String node, CuratorFramework zk, boolean nodeIsTracker, String job_trackingpath) { this.zk = zk; if (nodeIsTracker) { @@ -65,30 +76,25 @@ public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker, /** * Create the parent znode for this job state. */ - public void create() - throws IOException { - String[] paths = ZooKeeperStorage.getPaths(job_trackingroot); - for (String znode : paths) { - try { - zk.create(znode, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - } catch (Exception e) { - throw new IOException("Unable to create parent nodes"); - } + public void create() throws IOException { + try { + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(Ids.OPEN_ACL_UNSAFE).forPath(job_trackingroot); + } catch (KeeperException.NodeExistsException e) { + //root must exist already + } catch (Exception e) { + throw new IOException("Unable to create parent nodes"); } try { - trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + trackingnode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL) + .withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeTrackingZnode(), jobid.getBytes()); } catch (Exception e) { throw new IOException("Unable to create " + makeTrackingZnode()); } } - - public void delete() - throws IOException { + public void delete() throws IOException { try { - zk.delete(makeTrackingJobZnode(trackingnode), -1); + zk.delete().forPath(makeTrackingJobZnode(trackingnode)); } catch (Exception e) { // Might have been deleted already LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode)); @@ -101,13 +107,10 @@ public void delete() */ public String getJobID() throws IOException { try { - return new String(zk.getData(makeTrackingJobZnode(trackingnode), - false, new Stat())); - } catch (KeeperException e) { + return new String(zk.getData().forPath(makeTrackingJobZnode(trackingnode))); + } catch (Exception e) { // It was deleted during the transaction - throw new IOException("Node already deleted " + trackingnode); - } catch (InterruptedException e) { - throw new IOException("Couldn't read node " + trackingnode); + throw new IOException("Node already deleted " + trackingnode, e); } } @@ -129,13 +132,13 @@ public String makeTrackingJobZnode(String nodename) { * Get the list of tracking jobs. These can be used to determine which jobs have * expired. */ - public static List getTrackingJobs(Configuration conf, ZooKeeper zk) + public static List getTrackingJobs(Configuration conf, CuratorFramework zk) throws IOException { ArrayList jobs = new ArrayList(); try { - for (String myid : zk.getChildren( + for (String myid : zk.getChildren().forPath( conf.get(TempletonStorage.STORAGE_ROOT) - + ZooKeeperStorage.TRACKINGDIR, false)) { + + ZooKeeperStorage.TRACKINGDIR)) { jobs.add(myid); } } catch (Exception e) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java index 97d572c..cf27ffa 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -45,7 +44,7 @@ public interface TempletonStorage { // These are the possible types referenced by 'type' below. public enum Type { - UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD + UNKNOWN, JOB, JOBTRACKING } public static final String STORAGE_CLASS = "templeton.storage.class"; @@ -79,20 +78,6 @@ public void saveField(Type type, String id, String key, String val) public String getField(Type type, String id, String key); /** - * Get all the name/value pairs stored for this id. - * Be careful using getFields() -- optimistic locking will mean that - * your odds of a conflict are decreased if you read/write one field - * at a time. getFields() is intended for read-only usage. - * - * If the type is UNKNOWN, search for the id in all types. - * - * @param type The data type (as listed above) - * @param id The String id of this data grouping (jobid, etc.) - * @return A Map of key/value pairs found for this type/id. - */ - public Map getFields(Type type, String id); - - /** * Delete a data grouping (all data for a jobid, all tracking data * for a job, etc.). If the type is UNKNOWN, search for the id * in all types. @@ -105,13 +90,6 @@ public void saveField(Type type, String id, String key, String val) public boolean delete(Type type, String id) throws NotFoundException; /** - * Get the id of each data grouping in the storage system. - * - * @return An ArrayList of ids. - */ - public List getAll(); - - /** * Get the id of each data grouping of a given type in the storage * system. * @param type The data type (as listed above) @@ -120,26 +98,6 @@ public void saveField(Type type, String id, String key, String val) public List getAllForType(Type type); /** - * Get the id of each data grouping that has the specific key/value - * pair. - * @param key The name of the field to search for - * @param value The value of the field to search for - * @return An ArrayList of ids. - */ - public List getAllForKey(String key, String value); - - /** - * Get the id of each data grouping of a given type that has the - * specific key/value pair. - * @param type The data type (as listed above) - * @param key The name of the field to search for - * @param value The value of the field to search for - * @return An ArrayList of ids. - */ - public List getAllForTypeAndKey(Type type, String key, - String value); - - /** * For storage methods that require a connection, this is a hint * that it's time to open a connection. */ diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java index 4f5e2d9..24336e2 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java @@ -24,8 +24,8 @@ import java.util.List; import java.util.Date; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.conf.Configuration; -import org.apache.zookeeper.ZooKeeper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -89,7 +89,7 @@ public static void startInstance(Configuration appConf) throws IOException { * @throws IOException */ public void run() { - ZooKeeper zk = null; + CuratorFramework zk = null; List nodes = null; isRunning = true; while (!stop) { @@ -112,13 +112,7 @@ public void run() { } catch (Exception e) { LOG.error("Cleanup cycle failed: " + e.getMessage()); } finally { - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - // We're trying to exit anyway, just ignore. - } - } + if (zk != null) zk.close(); } long sleepMillis = (long) (Math.random() * interval); @@ -140,7 +134,7 @@ public void run() { * * @throws IOException */ - public List getChildList(ZooKeeper zk) { + public List getChildList(CuratorFramework zk) { try { List jobs = JobStateTracker.getTrackingJobs(appConf, zk); Collections.sort(jobs); @@ -154,7 +148,7 @@ public void run() { /** * Check to see if a job is more than maxage old, and delete it if so. */ - public boolean checkAndDelete(String node, ZooKeeper zk) { + public boolean checkAndDelete(String node, CuratorFramework zk) { JobState state = null; try { JobStateTracker tracker = new JobStateTracker(node, zk, true, @@ -167,8 +161,11 @@ public boolean checkAndDelete(String node, ZooKeeper zk) { // an error in creation, and we want to delete it anyway. long then = 0; if (state.getCreated() != null) { + //this is set in ZooKeeperStorage.create() then = state.getCreated(); } + //todo: this should check that the job actually completed and likely use completion time + //which is not tracked directly but available on /jobs/ node via "mtime" in Stat if (now - then > maxage) { LOG.info("Deleting " + tracker.getJobID()); state.delete(); @@ -177,7 +174,7 @@ public boolean checkAndDelete(String node, ZooKeeper zk) { } return false; } catch (Exception e) { - LOG.info("checkAndDelete failed for " + node); + LOG.info("checkAndDelete failed for " + node + " due to: " + e.getMessage()); // We don't throw a new exception for this -- just keep going with the // next one. return true; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java index ecde598..b9fc4b1 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java @@ -19,21 +19,18 @@ package org.apache.hive.hcatalog.templeton.tool; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; /** * A storage implementation based on storing everything in ZooKeeper. @@ -60,29 +57,29 @@ private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class); - private ZooKeeper zk; + private CuratorFramework zk; /** * Open a ZooKeeper connection for the JobState. */ - public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout) + public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs) throws IOException { - return new ZooKeeper(zkHosts, - zkSessionTimeout, - new Watcher() { - @Override - synchronized public void process(WatchedEvent event) { - } - }); + //do we need to add a connection status listener? What will that do? + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy); + zk.start(); + return zk; } /** * Open a ZooKeeper connection for the JobState. */ - public static ZooKeeper zkOpen(Configuration conf) - throws IOException { + public static CuratorFramework zkOpen(Configuration conf) throws IOException { + /*the silly looking call to Builder below is to get the default value of session timeout + from Curator which itself exposes it as system property*/ return zkOpen(conf.get(ZK_HOSTS), - conf.getInt(ZK_SESSION_TIMEOUT, 30000)); + conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs())); } public ZooKeeperStorage() { @@ -93,15 +90,9 @@ public ZooKeeperStorage() { /** * Close this ZK connection. */ - public void close() - throws IOException { + public void close() throws IOException { if (zk != null) { - try { - zk.close(); - zk = null; - } catch (InterruptedException e) { - throw new IOException("Closing ZooKeeper connection", e); - } + zk.close(); } } @@ -118,48 +109,54 @@ public void startCleanup(Configuration config) { */ public void create(Type type, String id) throws IOException { + boolean wasCreated = false; try { - String[] paths = getPaths(makeZnode(type, id)); - boolean wasCreated = false; - for (String znode : paths) { - try { - zk.create(znode, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - wasCreated = true; - } catch (KeeperException.NodeExistsException e) { + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeZnode(type, id)); + wasCreated = true; + } + catch(KeeperException.NodeExistsException ex) { + //we just created top level node for this jobId + } + catch(Exception ex) { + throw new IOException("Error creating " + makeZnode(type, id), ex); + } + if(wasCreated) { + try { + // Really not sure if this should go here. Will have + // to see how the storage mechanism evolves. + if (type.equals(Type.JOB)) { + JobStateTracker jt = new JobStateTracker(id, zk, false, job_trackingpath); + jt.create(); } - } - if (wasCreated) { + } catch (Exception e) { + LOG.error("Error tracking (jobId=" + id + "): " + e.getMessage()); + // If we couldn't create the tracker node, don't create the main node. try { - // Really not sure if this should go here. Will have - // to see how the storage mechanism evolves. - if (type.equals(Type.JOB)) { - JobStateTracker jt = new JobStateTracker(id, zk, false, - job_trackingpath); - jt.create(); - } - } catch (Exception e) { - LOG.warn("Error tracking: " + e.getMessage()); - // If we couldn't create the tracker node, don't - // create the main node. - zk.delete(makeZnode(type, id), -1); + zk.delete().forPath(makeZnode(type, id));//default version is -1 + } + catch(Exception ex) { + //EK: it's not obvious that this is the right logic, if we don't record the 'callback' + //for example and never notify the client of job completion + throw new IOException("Failed to delete " + makeZnode(type, id) + ":" + ex); } } - if (zk.exists(makeZnode(type, id), false) == null) + } + try { + if (zk.checkExists().forPath(makeZnode(type, id)) == null) { throw new IOException("Unable to create " + makeZnode(type, id)); - if (wasCreated) { - try { - saveField(type, id, "created", - Long.toString(System.currentTimeMillis())); - } catch (NotFoundException nfe) { - // Wow, something's really wrong. - throw new IOException("Couldn't write to node " + id, nfe); - } } - } catch (KeeperException e) { - throw new IOException("Creating " + id, e); - } catch (InterruptedException e) { - throw new IOException("Creating " + id, e); + } + catch (Exception ex) { + throw new IOException(ex); + } + if (wasCreated) { + try { + saveField(type, id, "created", + Long.toString(System.currentTimeMillis())); + } catch (NotFoundException nfe) { + // Wow, something's really wrong. + throw new IOException("Couldn't write to node " + id, nfe); + } } } @@ -198,25 +195,14 @@ public String getPath(Type type) { /** * A helper method that sets a field value. - * @param type - * @param id - * @param name - * @param val - * @throws KeeperException - * @throws UnsupportedEncodingException - * @throws InterruptedException + * @throws java.lang.Exception */ - private void setFieldData(Type type, String id, String name, String val) - throws KeeperException, UnsupportedEncodingException, InterruptedException { + private void setFieldData(Type type, String id, String name, String val) throws Exception { try { - zk.create(makeFieldZnode(type, id, name), - val.getBytes(ENCODING), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk.create().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE) + .forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING)); } catch (KeeperException.NodeExistsException e) { - zk.setData(makeFieldZnode(type, id, name), - val.getBytes(ENCODING), - -1); + zk.setData().forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING)); } } @@ -251,7 +237,7 @@ public void saveField(Type type, String id, String key, String val) @Override public String getField(Type type, String id, String key) { try { - byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null); + byte[] b = zk.getData().forPath(makeFieldZnode(type, id, key)); return new String(b, ENCODING); } catch (Exception e) { return null; @@ -259,26 +245,12 @@ public String getField(Type type, String id, String key) { } @Override - public Map getFields(Type type, String id) { - HashMap map = new HashMap(); - try { - for (String node : zk.getChildren(makeZnode(type, id), false)) { - byte[] b = zk.getData(makeFieldZnode(type, id, node), - false, null); - map.put(node, new String(b, ENCODING)); - } - } catch (Exception e) { - return map; - } - return map; - } - - @Override public boolean delete(Type type, String id) throws NotFoundException { try { - for (String child : zk.getChildren(makeZnode(type, id), false)) { + + for (String child : zk.getChildren().forPath(makeZnode(type, id))) { try { - zk.delete(makeFieldZnode(type, id, child), -1); + zk.delete().forPath(makeFieldZnode(type, id, child)); } catch (Exception e) { // Other nodes may be trying to delete this at the same time, // so just log errors and skip them. @@ -287,7 +259,7 @@ public boolean delete(Type type, String id) throws NotFoundException { } } try { - zk.delete(makeZnode(type, id), -1); + zk.delete().forPath(makeZnode(type, id)); } catch (Exception e) { // Same thing -- might be deleted by other nodes, so just go on. throw new NotFoundException("Couldn't delete " + @@ -302,58 +274,15 @@ public boolean delete(Type type, String id) throws NotFoundException { } @Override - public List getAll() { - ArrayList allNodes = new ArrayList(); - for (Type type : Type.values()) { - allNodes.addAll(getAllForType(type)); - } - return allNodes; - } - - @Override public List getAllForType(Type type) { try { - return zk.getChildren(getPath(type), false); + return zk.getChildren().forPath(getPath(type)); } catch (Exception e) { return new ArrayList(); } } @Override - public List getAllForKey(String key, String value) { - ArrayList allNodes = new ArrayList(); - try { - for (Type type : Type.values()) { - allNodes.addAll(getAllForTypeAndKey(type, key, value)); - } - } catch (Exception e) { - LOG.info("Couldn't find children."); - } - return allNodes; - } - - @Override - public List getAllForTypeAndKey(Type type, String key, String value) { - ArrayList allNodes = new ArrayList(); - try { - for (String id : zk.getChildren(getPath(type), false)) { - for (String field : zk.getChildren(id, false)) { - if (field.endsWith("/" + key)) { - byte[] b = zk.getData(field, false, null); - if (new String(b, ENCODING).equals(value)) { - allNodes.add(id); - } - } - } - } - } catch (Exception e) { - // Log and go to the next type -- this one might not exist - LOG.info("Couldn't find children of " + getPath(type)); - } - return allNodes; - } - - @Override public void openStorage(Configuration config) throws IOException { storage_root = config.get(STORAGE_ROOT); job_path = storage_root + "/jobs"; diff --git pom.xml pom.xml index 970da97..c694980 100644 --- pom.xml +++ pom.xml @@ -161,6 +161,7 @@ 3.4.5 1.1 2.4.0 + 2.5.0 @@ -472,7 +473,13 @@ - + + org.apache.curator + curator-framework + ${curator.version} + + + org.codehaus.groovy groovy-all ${groovy.version}