diff --git a/pom.xml b/pom.xml
index 2156fec..557e823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -488,6 +488,11 @@
${curator.version}
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
+
org.codehaus.groovy
groovy-all
${groovy.version}
diff --git a/service/pom.xml b/service/pom.xml
index b9d3a40..94ffad5 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -91,6 +91,11 @@
curator-framework
${curator.version}
+
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
org.apache.hive
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 17e1d85..df43793 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -35,6 +36,10 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -64,9 +69,11 @@
*/
public class HiveServer2 extends CompositeService {
private static final Log LOG = LogFactory.getLog(HiveServer2.class);
+ private static volatile boolean zooKeeperLastDeleteSuccessful = false;
private CLIService cliService;
private ThriftCLIService thriftCLIService;
+ private PersistentEphemeralNode znode;
private String znodePath;
private CuratorFramework zooKeeperClient;
private boolean registeredWithZooKeeper = false;
@@ -173,18 +180,33 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
- znodePath =
- zooKeeperClient.create().creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8);
+ znode =
+ new PersistentEphemeralNode(zooKeeperClient,
+ PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL_SEQUENTIAL, pathPrefix,
+ znodeDataUTF8);
+ znode.start();
+ // We'll make 6 attempts, with each attempt waiting for 20 seconds for node creation
+ long znodeCreationTimeout = 20;
+ int maxNodeCreationAttempts = 6;
+ int attempts = 0;
+ while (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
+ if (++attempts >= maxNodeCreationAttempts) {
+ throw new Exception("Max znode creation attempts " + maxNodeCreationAttempts + " exhausted");
+ }
+ }
setRegisteredWithZooKeeper(true);
+ znodePath = znode.getActualPath();
// Set a watch on the znode
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
- } catch (KeeperException e) {
+ } catch (Exception e) {
LOG.fatal("Unable to create a znode for this server instance", e);
+ if (znode != null) {
+ znode.close();
+ }
throw (e);
}
}
@@ -220,22 +242,33 @@ private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
- HiveServer2.this.setRegisteredWithZooKeeper(false);
- // If there are no more active client sessions, stop the server
- if (cliService.getSessionManager().getOpenSessionCount() == 0) {
- LOG.warn("This instance of HiveServer2 has been removed from the list of server "
- + "instances available for dynamic service discovery. "
- + "The last client session has ended - will shutdown now.");
- HiveServer2.this.stop();
+ if (znode != null) {
+ try {
+ znode.close();
+ LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ + "The server will be shut down after the last client sesssion completes.");
+ } catch (IOException e) {
+ LOG.error("Failed to close the persistent ephemeral znode", e);
+ } finally {
+ HiveServer2.this.setRegisteredWithZooKeeper(false);
+ // If there are no more active client sessions, stop the server
+ if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+ LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ HiveServer2.this.stop();
+ }
+ }
}
- LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
- + "The server will be shut down after the last client sesssion completes.");
}
}
}
private void removeServerInstanceFromZooKeeper() throws Exception {
setRegisteredWithZooKeeper(false);
+ if (znode != null) {
+ znode.close();
+ }
zooKeeperClient.close();
LOG.info("Server instance removed from ZooKeeper.");
}
@@ -350,18 +383,60 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce
List znodePaths =
zooKeeperClient.getChildren().forPath(
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+ List znodePathsUpdated;
+ int maxDeleteAttempts = 5;
+ int deleteAttempts;
// Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
- for (String znodePath : znodePaths) {
+ for (int i = 0; i < znodePaths.size(); i++) {
+ String znodePath = znodePaths.get(i);
if (znodePath.contains("version=" + versionNumber + ";")) {
- LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
- zooKeeperClient.delete().forPath(
+ String fullZnodePath =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
- + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath;
+ LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
+ // Reset the last deleted flag
+ zooKeeperLastDeleteSuccessful = false;
+ zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack())
+ .forPath(fullZnodePath);
+ // Wait for the delete to complete, sleep for 15s before checking the status
+ // Reset delete attempts for this node
+ deleteAttempts = 0;
+ while (!zooKeeperLastDeleteSuccessful) {
+ // Sleep for 15 seconds
+ try {
+ Thread.sleep(15L * 1000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Check if we've maxed out delete attempts
+ if (++deleteAttempts > maxDeleteAttempts) {
+ LOG.error("Unable to remove the znode: " + fullZnodePath + " from ZooKeeper");
+ }
+ }
+ // Get the updated path list
+ znodePathsUpdated =
+ zooKeeperClient.getChildren().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+ // Gives a list of any new paths that may have been created to maintain the persistent ephemeral node
+ // (Curator's PersistentEphemeralNode renames the node while trying to keep it in ZK)
+ znodePathsUpdated.removeAll(znodePaths);
+ // Add the new paths to the znodes list. We'll try for their removal as well.
+ znodePaths.addAll(znodePathsUpdated);
}
}
zooKeeperClient.close();
}
+ private static class DeleteCallBack implements BackgroundCallback {
+ @Override
+ public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event)
+ throws Exception {
+ if (event.getType() == CuratorEventType.DELETE) {
+ HiveServer2.zooKeeperLastDeleteSuccessful = true;
+ }
+ }
+ }
+
public static void main(String[] args) {
HiveConf.setLoadHiveServer2Config(true);
try {