diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh index 1e94542..95bc151 100644 --- a/bin/ext/hiveserver2.sh +++ b/bin/ext/hiveserver2.sh @@ -17,7 +17,7 @@ THISSERVICE=hiveserver2 export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " hiveserver2() { - echo "$(timestamp): Starting HiveServer2" + >&2 echo "$(timestamp): Starting HiveServer2" CLASS=org.apache.hive.service.server.HiveServer2 if $cygwin; then HIVE_LIB=`cygpath -w "$HIVE_LIB"` diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 819ce19..22e3f10 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -51,7 +51,7 @@ import com.google.common.base.Preconditions; public class HS2ActivePassiveHARegistry extends ZkRegistryBase implements - ServiceRegistry, HiveServer2HAInstanceSet { + ServiceRegistry, HiveServer2HAInstanceSet, HiveServer2.FailoverHandler { private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class); static final String ACTIVE_ENDPOINT = "activeEndpoint"; static final String PASSIVE_ENDPOINT = "passiveEndpoint"; @@ -60,6 +60,9 @@ private static final String INSTANCE_GROUP = "instances"; private static final String LEADER_LATCH_PATH = "/_LEADER"; private LeaderLatch leaderLatch; + private LeaderLatchListener latchListener; + private Map registeredListeners = new HashMap<>(); + private String latchPath; private ServiceRecord srv; private boolean isClient; private final String uniqueId; @@ -80,7 +83,7 @@ static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) { String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); String zkNameSpacePrefix = zkNameSpace + "-"; return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, - SASL_LOGIN_CONTEXT_NAME, conf, isClient); + isClient ? null : SASL_LOGIN_CONTEXT_NAME, conf, isClient); } private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix, @@ -96,7 +99,9 @@ private HS2ActivePassiveHARegistry(final String instanceName, final String zkNam } else { this.uniqueId = UNIQUE_ID.toString(); } - leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + this.latchPath = leaderLatchPath; + this.leaderLatch = getNewLeaderLatchPath(); + this.latchListener = new HS2LeaderLatchListener(); } @Override @@ -105,7 +110,7 @@ public void start() throws IOException { if (!isClient) { this.srv = getNewServiceRecord(); register(); - leaderLatch.addListener(new HS2LeaderLatchListener()); + leaderLatch.addListener(latchListener); try { // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader // which can be verified via leaderLatch.hasLeadership() @@ -204,6 +209,39 @@ private boolean hasLeadership() { return leaderLatch.hasLeadership(); } + @Override + public void failover() throws Exception { + if (hasLeadership()) { + LOG.info("Failover request received for HS2 instance: {}. Restarting leader latch..", uniqueId); + leaderLatch.close(LeaderLatch.CloseMode.NOTIFY_LEADER); + leaderLatch = getNewLeaderLatchPath(); + // re-attach all listeners including explicitly registered listeners + leaderLatch.addListener(latchListener); + for (Map.Entry registeredListener : registeredListeners.entrySet()) { + if (registeredListener.getValue() == null) { + leaderLatch.addListener(registeredListener.getKey()); + } else { + leaderLatch.addListener(registeredListener.getKey(), registeredListener.getValue()); + } + } + leaderLatch.start(); + LOG.info("Failover complete. Leader latch restarted successfully. New leader: {}", + leaderLatch.getLeader().getId()); + } else { + LOG.warn("Failover request received for HS2 instance: {} that is not leader. Skipping..", uniqueId); + } + } + + /** + * Returns a new instance of leader latch path but retains the same uniqueId. This is only used when HS2 startsup or + * when a manual failover is triggered (in which case uniqueId will still remain as the instance has not restarted) + * + * @return - new leader latch + */ + private LeaderLatch getNewLeaderLatchPath() { + return new LeaderLatch(zooKeeperClient, latchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + } + private class HS2LeaderLatchListener implements LeaderLatchListener { // leadership state changes and sending out notifications to listener happens inside synchronous method in curator. @@ -282,6 +320,7 @@ public int size() { * @param executorService - event handler executor service */ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { + registeredListeners.put(latchListener, executorService); leaderLatch.addListener(latchListener, executorService); } @@ -290,6 +329,9 @@ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final // Hostname confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Web port + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname)); // Hostname:port confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); confsToPublish.put(UNIQUE_IDENTIFIER, uniqueId); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 5b792ac..0288a13 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,9 +18,14 @@ package org.apache.hive.service.server; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -226,6 +231,22 @@ public void run() { this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + try { + if (serviceDiscovery) { + serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); + } + } + } catch (Exception e) { + throw new ServiceException(e); + } + // Setup web UI final int webUIPort; final String webHost; @@ -299,6 +320,7 @@ public void run() { } if (serviceDiscovery && activePassiveHA) { builder.setContextAttribute("hs2.isLeader", isLeader); + builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry)); builder.setContextAttribute("hiveconf", hiveConf); builder.addServlet("leader", HS2LeadershipStatus.class); builder.addServlet("peers", HS2Peers.class); @@ -315,22 +337,6 @@ public void run() { throw new ServiceException(ie); } - try { - if (serviceDiscovery) { - serviceUri = getServerInstanceURI(); - addConfsToPublish(hiveConf, confsToPublish, serviceUri); - if (activePassiveHA) { - hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); - leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); - leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Leader Actions Handler Thread").build()); - hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); - } - } - } catch (Exception e) { - throw new ServiceException(e); - } - // Add a shutdown hook for catching SIGTERM & SIGINT ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } @@ -536,7 +542,22 @@ public boolean isLeader() { return isLeader.get(); } + interface FailoverHandler { + void failover() throws Exception; + } + + public static class FailoverHandlerCallback implements FailoverHandler { + private HS2ActivePassiveHARegistry hs2HARegistry; + + FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) { + this.hs2HARegistry = hs2HARegistry; + } + @Override + public void failover() throws Exception { + hs2HARegistry.failover(); + } + } /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client @@ -1002,6 +1023,19 @@ public static void main(String[] args) { .withLongOpt("deregister") .withDescription("Deregister all instances of given version from dynamic service discovery") .create()); + // --listHAPeers + options.addOption(OptionBuilder + .hasArgs(0) + .withLongOpt("listHAPeers") + .withDescription("List all HS2 instances when running in Active Passive HA mode") + .create()); + // --failover + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("workerIdentity") + .withLongOpt("failover") + .withDescription("Manually failover Active HS2 instance to passive standby mode") + .create()); options.addOption(new Option("H", "help", false, "Print help information")); } @@ -1031,6 +1065,18 @@ ServerOptionsProcessorResponse parse(String[] argv) { return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor( commandLine.getOptionValue("deregister"))); } + + // Process --listHAPeers + if (commandLine.hasOption("listHAPeers")) { + return new ServerOptionsProcessorResponse(new ListHAPeersExecutor()); + } + + // Process --failover + if (commandLine.hasOption("failover")) { + return new ServerOptionsProcessorResponse(new FailoverHS2InstanceExecutor( + commandLine.getOptionValue("failover") + )); + } } catch (ParseException e) { // Error out & exit - we were not able to parse the args successfully System.err.println("Error starting HiveServer2 with given arguments: "); @@ -1128,4 +1174,98 @@ public void execute() { System.exit(0); } } + + static class FailoverHS2InstanceExecutor implements ServerOptionsExecutor { + private final String workerIdentity; + + FailoverHS2InstanceExecutor(String workerIdentity) { + this.workerIdentity = workerIdentity; + } + + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + Collection hs2Instances = haRegistry.getAll(); + if (hs2Instances.isEmpty()) { + LOG.error("No HiveServer2 instances are running in HA mode"); + System.err.println("No HiveServer2 instances are running in HA mode"); + System.exit(-1); + } + HiveServer2Instance targetInstance = null; + for (HiveServer2Instance instance : hs2Instances) { + if (instance.getWorkerIdentity().equals(workerIdentity)) { + targetInstance = instance; + break; + } + } + if (targetInstance == null) { + LOG.error("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.err.println("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.exit(-1); + } + if (hs2Instances.size() == 1) { + LOG.error("Only one HiveServer2 instance running in thefail cluster. Cannot failover: " + workerIdentity); + System.err.println("Only one HiveServer2 instance running in the cluster. Cannot failover: " + workerIdentity); + System.exit(-1); + } + if (!targetInstance.isLeader()) { + LOG.error("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.err.println("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.exit(-1); + } + + String webPort = targetInstance.getProperties().get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname); + if (StringUtils.isEmpty(webPort)) { + LOG.error("Unable to determine web port for instance: " + workerIdentity); + System.err.println("Unable to determine web port for instance: " + workerIdentity); + System.exit(-1); + } + + // invoke DELETE /leader endpoint for failover + String webEndpoint = "http://" + targetInstance.getHost() + ":" + webPort + "/leader"; + URL obj = new URL(webEndpoint); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("DELETE"); + StringBuilder responseString = new StringBuilder(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + responseString.append(inputLine); + } + } + if (con.getResponseCode() == 200) { + System.out.println(responseString); + } else { + LOG.error("Unable to failover HiveServer2 instance: " + workerIdentity + ". error: " + responseString); + System.err.println("Unable to failover HiveServer2 instance: " + workerIdentity + ". error: " + responseString); + System.exit(-1); + } + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } + + static class ListHAPeersExecutor implements ServerOptionsExecutor { + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + HS2Peers.HS2Instances hs2Instances = new HS2Peers.HS2Instances(haRegistry.getAll()); + String jsonOut = hs2Instances.toJson(); + System.out.println(jsonOut); + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } } diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java index 33529ed..fff7977 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -25,18 +25,29 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hive.http.HttpServer; +import org.apache.hive.service.server.HiveServer2; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Returns "true" if this HS2 instance is leader else "false". + * Invoking a "DELETE" method on this endpoint will trigger a failover if this instance is a leader. + * hadoop.security.instrumentation.requires.admin should be set to true and current user has to be in admin ACLS + * for accessing any of these endpoints. */ public class HS2LeadershipStatus extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class); @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } ServletContext ctx = getServletContext(); AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader"); LOG.info("Returning isLeader: {}", isLeader); @@ -45,4 +56,65 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } + + private class FailoverResponse { + private boolean success; + private String message; + + FailoverResponse() { } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(final boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + } + + @Override + protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } + + ObjectMapper mapper = new ObjectMapper(); + FailoverResponse failoverResponse = new FailoverResponse(); + AtomicBoolean isLeader = (AtomicBoolean) context.getAttribute("hs2.isLeader"); + if (!isLeader.get()) { + failoverResponse.setSuccess(false); + failoverResponse.setMessage("Cannot failover an instance that is not a leader"); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + return; + } + + HiveServer2.FailoverHandlerCallback failoverHandler = (HiveServer2.FailoverHandlerCallback) context + .getAttribute("hs2.failover.callback"); + try { + failoverHandler.failover(); + failoverResponse.setSuccess(true); + failoverResponse.setMessage("Failover successful!"); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_OK); + } catch (Exception e) { + String errMsg = "Cannot perform failover of HS2 instance. err: " + e.getMessage(); + LOG.error(errMsg, e); + failoverResponse.setSuccess(false); + failoverResponse.setMessage(errMsg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } } diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java index a51bbeb..e5e96f3 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java @@ -26,10 +26,11 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.http.HttpServer; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; -import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -55,20 +56,28 @@ public HS2Instances(final Collection hiveServer2Instances) public void setHiveServer2Instances(final Collection hiveServer2Instances) { this.hiveServer2Instances = hiveServer2Instances; } + + @JsonIgnore + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + } } @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } ServletContext ctx = getServletContext(); HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf"); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); - // serialize json based on field annotations only - mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker() - .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); HS2Instances instances = new HS2Instances(hs2Registry.getAll()); - mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances); + response.getWriter().write(instances.toJson()); response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); }