diff --git a/pom.xml b/pom.xml index 2156fec..557e823 100644 --- a/pom.xml +++ b/pom.xml @@ -488,6 +488,11 @@ ${curator.version} + org.apache.curator + curator-recipes + ${curator.version} + + org.codehaus.groovy groovy-all ${groovy.version} diff --git a/service/pom.xml b/service/pom.xml index b9d3a40..94ffad5 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -91,6 +91,11 @@ curator-framework ${curator.version} + + org.apache.curator + curator-recipes + ${curator.version} + org.apache.hive diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 17e1d85..df43793 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -35,6 +36,10 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -64,9 +69,11 @@ */ public class HiveServer2 extends CompositeService { private static final Log LOG = LogFactory.getLog(HiveServer2.class); + private static volatile boolean zooKeeperLastDeleteSuccessful = false; private CLIService cliService; private ThriftCLIService thriftCLIService; + private PersistentEphemeralNode znode; private String znodePath; private CuratorFramework zooKeeperClient; private boolean registeredWithZooKeeper = false; @@ -173,18 +180,33 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; - znodePath = - zooKeeperClient.create().creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8); + znode = + new PersistentEphemeralNode(zooKeeperClient, + PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL_SEQUENTIAL, pathPrefix, + znodeDataUTF8); + znode.start(); + // We'll make 6 attempts, with each attempt waiting for 20 seconds for node creation + long znodeCreationTimeout = 20; + int maxNodeCreationAttempts = 6; + int attempts = 0; + while (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + if (++attempts >= maxNodeCreationAttempts) { + throw new Exception("Max znode creation attempts " + maxNodeCreationAttempts + " exhausted"); + } + } setRegisteredWithZooKeeper(true); + znodePath = znode.getActualPath(); // Set a watch on the znode if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); } LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); - } catch (KeeperException e) { + } catch (Exception e) { LOG.fatal("Unable to create a znode for this server instance", e); + if (znode != null) { + znode.close(); + } throw (e); } } @@ -220,22 +242,33 @@ private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception { @Override public void process(WatchedEvent event) { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { - HiveServer2.this.setRegisteredWithZooKeeper(false); - // If there are no more active client sessions, stop the server - if (cliService.getSessionManager().getOpenSessionCount() == 0) { - LOG.warn("This instance of HiveServer2 has been removed from the list of server " - + "instances available for dynamic service discovery. " - + "The last client session has ended - will shutdown now."); - HiveServer2.this.stop(); + if (znode != null) { + try { + znode.close(); + LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " + + "The server will be shut down after the last client sesssion completes."); + } catch (IOException e) { + LOG.error("Failed to close the persistent ephemeral znode", e); + } finally { + HiveServer2.this.setRegisteredWithZooKeeper(false); + // If there are no more active client sessions, stop the server + if (cliService.getSessionManager().getOpenSessionCount() == 0) { + LOG.warn("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + HiveServer2.this.stop(); + } + } } - LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " - + "The server will be shut down after the last client sesssion completes."); } } } private void removeServerInstanceFromZooKeeper() throws Exception { setRegisteredWithZooKeeper(false); + if (znode != null) { + znode.close(); + } zooKeeperClient.close(); LOG.info("Server instance removed from ZooKeeper."); } @@ -350,18 +383,60 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce List znodePaths = zooKeeperClient.getChildren().forPath( ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); + List znodePathsUpdated; + int maxDeleteAttempts = 5; + int deleteAttempts; // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper - for (String znodePath : znodePaths) { + for (int i = 0; i < znodePaths.size(); i++) { + String znodePath = znodePaths.get(i); if (znodePath.contains("version=" + versionNumber + ";")) { - LOG.info("Removing the znode: " + znodePath + " from ZooKeeper"); - zooKeeperClient.delete().forPath( + String fullZnodePath = ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath); + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath; + LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper"); + // Reset the last deleted flag + zooKeeperLastDeleteSuccessful = false; + zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack()) + .forPath(fullZnodePath); + // Wait for the delete to complete, sleep for 15s before checking the status + // Reset delete attempts for this node + deleteAttempts = 0; + while (!zooKeeperLastDeleteSuccessful) { + // Sleep for 15 seconds + try { + Thread.sleep(15L * 1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Check if we've maxed out delete attempts + if (++deleteAttempts > maxDeleteAttempts) { + LOG.error("Unable to remove the znode: " + fullZnodePath + " from ZooKeeper"); + } + } + // Get the updated path list + znodePathsUpdated = + zooKeeperClient.getChildren().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); + // Gives a list of any new paths that may have been created to maintain the persistent ephemeral node + // (Curator's PersistentEphemeralNode renames the node while trying to keep it in ZK) + znodePathsUpdated.removeAll(znodePaths); + // Add the new paths to the znodes list. We'll try for their removal as well. + znodePaths.addAll(znodePathsUpdated); } } zooKeeperClient.close(); } + private static class DeleteCallBack implements BackgroundCallback { + @Override + public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event) + throws Exception { + if (event.getType() == CuratorEventType.DELETE) { + HiveServer2.zooKeeperLastDeleteSuccessful = true; + } + } + } + public static void main(String[] args) { HiveConf.setLoadHiveServer2Config(true); try {