diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh index 1e94542..95bc151 100644 --- a/bin/ext/hiveserver2.sh +++ b/bin/ext/hiveserver2.sh @@ -17,7 +17,7 @@ THISSERVICE=hiveserver2 export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " hiveserver2() { - echo "$(timestamp): Starting HiveServer2" + >&2 echo "$(timestamp): Starting HiveServer2" CLASS=org.apache.hive.service.server.HiveServer2 if $cygwin; then HIVE_LIB=`cygpath -w "$HIVE_LIB"` diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index 72b2a8c..f4dc129 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -124,15 +124,8 @@ private static void setHAConfigs(Configuration conf) { public void testActivePassiveHA() throws Exception { String instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } - String instanceId2 = UUID.randomUUID().toString(); miniHS2_2.start(getConfOverlay(instanceId2)); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; @@ -175,9 +168,6 @@ public void testActivePassiveHA() throws Exception { miniHS2_1.stop(); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_2.isLeader()); url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("true", sendGet(url)); @@ -219,9 +209,6 @@ public void testActivePassiveHA() throws Exception { instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } assertEquals(false, miniHS2_1.isLeader()); url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("false", sendGet(url)); @@ -264,17 +251,11 @@ public void testActivePassiveHA() throws Exception { public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { String instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } String instanceId2 = UUID.randomUUID().toString(); Map confOverlay = getConfOverlay(instanceId2); confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); miniHS2_2.start(confOverlay); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; @@ -323,6 +304,48 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { openConnectionAndRunQuery(zkJdbcUrl); } + @Test(timeout = 60000) + public void testManualFailover() throws Exception { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + + // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1)); + + // trigger failover on miniHS2_1 + String resp = sendDelete(url1); + assertTrue(resp.contains("Failover successful!")); + + // make sure miniHS2_1 is not leader + assertEquals(false, miniHS2_1.isLeader()); + assertEquals("false", sendGet(url1)); + + // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.isLeader()); + assertEquals("true", sendGet(url2)); + + // send failover request again to miniHS2_1 and get a failure + resp = sendDelete(url1); + assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(false, miniHS2_1.isLeader()); + + // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) + resp = sendDelete(url2); + assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1)); + assertEquals("false", sendGet(url2)); + assertEquals(false, miniHS2_2.isLeader()); + } + private Connection getConnection(String jdbcURL, String user) throws SQLException { return DriverManager.getConnection(jdbcURL, user, "bar"); } @@ -359,7 +382,21 @@ private String sendGet(String url) throws Exception { return response.toString(); } - private Map getConfOverlay(final String instanceId) { + private String sendDelete(String url) throws Exception { + URL obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("DELETE"); + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + return response.toString(); + } + + private Map getConfOverlay(final String instanceId) { Map confOverlay = new HashMap<>(); confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 819ce19..60d84e6 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -51,7 +51,7 @@ import com.google.common.base.Preconditions; public class HS2ActivePassiveHARegistry extends ZkRegistryBase implements - ServiceRegistry, HiveServer2HAInstanceSet { + ServiceRegistry, HiveServer2HAInstanceSet, HiveServer2.FailoverHandler { private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class); static final String ACTIVE_ENDPOINT = "activeEndpoint"; static final String PASSIVE_ENDPOINT = "passiveEndpoint"; @@ -60,6 +60,8 @@ private static final String INSTANCE_GROUP = "instances"; private static final String LEADER_LATCH_PATH = "/_LEADER"; private LeaderLatch leaderLatch; + private Map registeredListeners = new HashMap<>(); + private String latchPath; private ServiceRecord srv; private boolean isClient; private final String uniqueId; @@ -80,7 +82,7 @@ static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) { String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); String zkNameSpacePrefix = zkNameSpace + "-"; return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, - SASL_LOGIN_CONTEXT_NAME, conf, isClient); + isClient ? null : SASL_LOGIN_CONTEXT_NAME, conf, isClient); } private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix, @@ -96,7 +98,8 @@ private HS2ActivePassiveHARegistry(final String instanceName, final String zkNam } else { this.uniqueId = UNIQUE_ID.toString(); } - leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + this.latchPath = leaderLatchPath; + this.leaderLatch = getNewLeaderLatchPath(); } @Override @@ -105,7 +108,7 @@ public void start() throws IOException { if (!isClient) { this.srv = getNewServiceRecord(); register(); - leaderLatch.addListener(new HS2LeaderLatchListener()); + registerLeaderLatchListener(new HS2LeaderLatchListener(), null); try { // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader // which can be verified via leaderLatch.hasLeadership() @@ -204,6 +207,38 @@ private boolean hasLeadership() { return leaderLatch.hasLeadership(); } + @Override + public void failover() throws Exception { + if (hasLeadership()) { + LOG.info("Failover request received for HS2 instance: {}. Restarting leader latch..", uniqueId); + leaderLatch.close(LeaderLatch.CloseMode.NOTIFY_LEADER); + leaderLatch = getNewLeaderLatchPath(); + // re-attach all registered listeners + for (Map.Entry registeredListener : registeredListeners.entrySet()) { + if (registeredListener.getValue() == null) { + leaderLatch.addListener(registeredListener.getKey()); + } else { + leaderLatch.addListener(registeredListener.getKey(), registeredListener.getValue()); + } + } + leaderLatch.start(); + LOG.info("Failover complete. Leader latch restarted successfully. New leader: {}", + leaderLatch.getLeader().getId()); + } else { + LOG.warn("Failover request received for HS2 instance: {} that is not leader. Skipping..", uniqueId); + } + } + + /** + * Returns a new instance of leader latch path but retains the same uniqueId. This is only used when HS2 startsup or + * when a manual failover is triggered (in which case uniqueId will still remain as the instance has not restarted) + * + * @return - new leader latch + */ + private LeaderLatch getNewLeaderLatchPath() { + return new LeaderLatch(zooKeeperClient, latchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + } + private class HS2LeaderLatchListener implements LeaderLatchListener { // leadership state changes and sending out notifications to listener happens inside synchronous method in curator. @@ -282,7 +317,12 @@ public int size() { * @param executorService - event handler executor service */ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { - leaderLatch.addListener(latchListener, executorService); + registeredListeners.put(latchListener, executorService); + if (executorService == null) { + leaderLatch.addListener(latchListener); + } else { + leaderLatch.addListener(latchListener, executorService); + } } private Map getConfsToPublish() { @@ -290,6 +330,9 @@ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final // Hostname confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Web port + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname)); // Hostname:port confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); confsToPublish.put(UNIQUE_IDENTIFIER, uniqueId); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index bb92c44..f91471a 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,9 +18,14 @@ package org.apache.hive.service.server; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,6 +92,7 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; @@ -222,6 +228,22 @@ public void run() { this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + try { + if (serviceDiscovery) { + serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); + } + } + } catch (Exception e) { + throw new ServiceException(e); + } + // Setup web UI final int webUIPort; final String webHost; @@ -295,6 +317,7 @@ public void run() { } if (serviceDiscovery && activePassiveHA) { builder.setContextAttribute("hs2.isLeader", isLeader); + builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry)); builder.setContextAttribute("hiveconf", hiveConf); builder.addServlet("leader", HS2LeadershipStatus.class); builder.addServlet("peers", HS2Peers.class); @@ -311,22 +334,6 @@ public void run() { throw new ServiceException(ie); } - try { - if (serviceDiscovery) { - serviceUri = getServerInstanceURI(); - addConfsToPublish(hiveConf, confsToPublish, serviceUri); - if (activePassiveHA) { - hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); - leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); - leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Leader Actions Handler Thread").build()); - hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); - } - } - } catch (Exception e) { - throw new ServiceException(e); - } - // Add a shutdown hook for catching SIGTERM & SIGINT ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } @@ -532,7 +539,22 @@ public boolean isLeader() { return isLeader.get(); } + interface FailoverHandler { + void failover() throws Exception; + } + + public static class FailoverHandlerCallback implements FailoverHandler { + private HS2ActivePassiveHARegistry hs2HARegistry; + FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) { + this.hs2HARegistry = hs2HARegistry; + } + + @Override + public void failover() throws Exception { + hs2HARegistry.failover(); + } + } /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client @@ -663,6 +685,9 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); + // TODO: should we explicitly close client connections with appropriate error msg? SessionManager.closeSession() + // will shut itself down upon explicit --deregister after all connections are closed. Something similar but for + // failover. hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); } @@ -998,6 +1023,19 @@ public static void main(String[] args) { .withLongOpt("deregister") .withDescription("Deregister all instances of given version from dynamic service discovery") .create()); + // --listHAPeers + options.addOption(OptionBuilder + .hasArgs(0) + .withLongOpt("listHAPeers") + .withDescription("List all HS2 instances when running in Active Passive HA mode") + .create()); + // --failover + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("workerIdentity") + .withLongOpt("failover") + .withDescription("Manually failover Active HS2 instance to passive standby mode") + .create()); options.addOption(new Option("H", "help", false, "Print help information")); } @@ -1027,6 +1065,18 @@ ServerOptionsProcessorResponse parse(String[] argv) { return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor( commandLine.getOptionValue("deregister"))); } + + // Process --listHAPeers + if (commandLine.hasOption("listHAPeers")) { + return new ServerOptionsProcessorResponse(new ListHAPeersExecutor()); + } + + // Process --failover + if (commandLine.hasOption("failover")) { + return new ServerOptionsProcessorResponse(new FailoverHS2InstanceExecutor( + commandLine.getOptionValue("failover") + )); + } } catch (ParseException e) { // Error out & exit - we were not able to parse the args successfully System.err.println("Error starting HiveServer2 with given arguments: "); @@ -1124,4 +1174,115 @@ public void execute() { System.exit(0); } } + + /** + * Handler for --failover command. The way failover works is, + * - the client gets from user input + * - the client uses HS2 HA registry to get list of HS2 instances and finds the one that matches + * - if there is a match, client makes sure the instance is a leader (only leader can failover) + * - if the matched instance is a leader, its web endpoint is obtained from service record then http DELETE method + * is invoked on /leader endpoint (Yes. Manual failover requires web UI to be enabled) + * - the webpoint checks if admin ACLs are set, if so will close and restart the leader latch triggering a failover + */ + static class FailoverHS2InstanceExecutor implements ServerOptionsExecutor { + private final String workerIdentity; + + FailoverHS2InstanceExecutor(String workerIdentity) { + this.workerIdentity = workerIdentity; + } + + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + Collection hs2Instances = haRegistry.getAll(); + // no HS2 instances are running + if (hs2Instances.isEmpty()) { + LOG.error("No HiveServer2 instances are running in HA mode"); + System.err.println("No HiveServer2 instances are running in HA mode"); + System.exit(-1); + } + HiveServer2Instance targetInstance = null; + for (HiveServer2Instance instance : hs2Instances) { + if (instance.getWorkerIdentity().equals(workerIdentity)) { + targetInstance = instance; + break; + } + } + // no match for workerIdentity + if (targetInstance == null) { + LOG.error("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.err.println("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.exit(-1); + } + // only one HS2 instance available (cannot failover) + if (hs2Instances.size() == 1) { + LOG.error("Only one HiveServer2 instance running in thefail cluster. Cannot failover: " + workerIdentity); + System.err.println("Only one HiveServer2 instance running in the cluster. Cannot failover: " + workerIdentity); + System.exit(-1); + } + // matched HS2 instance is not leader + if (!targetInstance.isLeader()) { + LOG.error("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.err.println("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.exit(-1); + } + + String webPort = targetInstance.getProperties().get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname); + // web port cannot be obtained + if (StringUtils.isEmpty(webPort)) { + LOG.error("Unable to determine web port for instance: " + workerIdentity); + System.err.println("Unable to determine web port for instance: " + workerIdentity); + System.exit(-1); + } + + // invoke DELETE /leader endpoint for failover + String webEndpoint = "http://" + targetInstance.getHost() + ":" + webPort + "/leader"; + URL obj = new URL(webEndpoint); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("DELETE"); + StringBuilder responseString = new StringBuilder(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + responseString.append(inputLine); + } + } + // print the response as JSON output + if (con.getResponseCode() == 200) { + System.out.println(responseString); + } else { + LOG.error("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " + + con.getResponseCode() + "error: " + responseString); + System.err.println("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " + + con.getResponseCode() + " error: " + responseString); + System.exit(-1); + } + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } + + static class ListHAPeersExecutor implements ServerOptionsExecutor { + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + HS2Peers.HS2Instances hs2Instances = new HS2Peers.HS2Instances(haRegistry.getAll()); + String jsonOut = hs2Instances.toJson(); + System.out.println(jsonOut); + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } } diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java index 33529ed..68a3fb5 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -25,18 +25,30 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hive.http.HttpServer; +import org.apache.hive.service.server.HiveServer2; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Returns "true" if this HS2 instance is leader else "false". + * Invoking a "DELETE" method on this endpoint will trigger a failover if this instance is a leader. + * hadoop.security.instrumentation.requires.admin should be set to true and current user has to be in admin ACLS + * for accessing any of these endpoints. */ public class HS2LeadershipStatus extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class); @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + LOG.warn("Unauthorized to perform GET action"); + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } ServletContext ctx = getServletContext(); AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader"); LOG.info("Returning isLeader: {}", isLeader); @@ -45,4 +57,71 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } + + private class FailoverResponse { + private boolean success; + private String message; + + FailoverResponse() { } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(final boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + } + + @Override + protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + LOG.warn("Unauthorized to perform DELETE action"); + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } + + LOG.info("DELETE handler invoked for failover.."); + ObjectMapper mapper = new ObjectMapper(); + FailoverResponse failoverResponse = new FailoverResponse(); + AtomicBoolean isLeader = (AtomicBoolean) context.getAttribute("hs2.isLeader"); + if (!isLeader.get()) { + String msg = "Cannot failover an instance that is not a leader"; + LOG.info(msg); + failoverResponse.setSuccess(false); + failoverResponse.setMessage(msg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + return; + } + + HiveServer2.FailoverHandlerCallback failoverHandler = (HiveServer2.FailoverHandlerCallback) context + .getAttribute("hs2.failover.callback"); + try { + String msg = "Failover successful!"; + LOG.info(msg); + failoverHandler.failover(); + failoverResponse.setSuccess(true); + failoverResponse.setMessage(msg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_OK); + } catch (Exception e) { + String errMsg = "Cannot perform failover of HS2 instance. err: " + e.getMessage(); + LOG.error(errMsg, e); + failoverResponse.setSuccess(false); + failoverResponse.setMessage(errMsg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } } diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java index a51bbeb..a7b3877 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java @@ -26,17 +26,21 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.http.HttpServer; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; -import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Returns all HS2 instances in Active-Passive standy modes. */ public class HS2Peers extends HttpServlet { + private static final Logger LOG = LoggerFactory.getLogger(HS2Peers.class); public static class HS2Instances { private Collection hiveServer2Instances; @@ -55,20 +59,29 @@ public HS2Instances(final Collection hiveServer2Instances) public void setHiveServer2Instances(final Collection hiveServer2Instances) { this.hiveServer2Instances = hiveServer2Instances; } + + @JsonIgnore + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + } } @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + LOG.warn("Unauthorized to perform GET action"); + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + return; + } ServletContext ctx = getServletContext(); HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf"); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); - // serialize json based on field annotations only - mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker() - .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); HS2Instances instances = new HS2Instances(hs2Registry.getAll()); - mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances); + response.getWriter().write(instances.toJson()); response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); }