diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh
index 1e94542..95bc151 100644
--- a/bin/ext/hiveserver2.sh
+++ b/bin/ext/hiveserver2.sh
@@ -17,7 +17,7 @@ THISSERVICE=hiveserver2
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
hiveserver2() {
- echo "$(timestamp): Starting HiveServer2"
+ >&2 echo "$(timestamp): Starting HiveServer2"
CLASS=org.apache.hive.service.server.HiveServer2
if $cygwin; then
HIVE_LIB=`cygpath -w "$HIVE_LIB"`
diff --git a/common/src/java/org/apache/hive/http/HttpServer.java b/common/src/java/org/apache/hive/http/HttpServer.java
index 71b2668..93b11e3 100644
--- a/common/src/java/org/apache/hive/http/HttpServer.java
+++ b/common/src/java/org/apache/hive/http/HttpServer.java
@@ -273,6 +273,28 @@ public static boolean isInstrumentationAccessAllowed(
}
/**
+ * Same as {@link HttpServer#isInstrumentationAccessAllowed(ServletContext, HttpServletRequest, HttpServletResponse)}
+ * except that it returns true only if hadoop.security.instrumentation.requires.admin is set to true.
+ */
+ @InterfaceAudience.LimitedPrivate("hive")
+ public static boolean isInstrumentationAccessAllowedStrict(
+ ServletContext servletContext, HttpServletRequest request,
+ HttpServletResponse response) throws IOException {
+ Configuration conf =
+ (Configuration) servletContext.getAttribute(CONF_CONTEXT_ATTRIBUTE);
+
+ boolean access;
+ boolean adminAccess = conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_INSTRUMENTATION_REQUIRES_ADMIN, false);
+ if (adminAccess) {
+ access = hasAdministratorAccess(servletContext, request, response);
+ } else {
+ return false;
+ }
+ return access;
+ }
+
+ /**
* Check if the remote user has access to an object (e.g. query history) that belongs to a user
*
* @param ctx the context containing the admin ACL.
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
index 72b2a8c..e53826d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -21,33 +21,37 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodBase;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.http.security.PamAuthenticator;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
import org.apache.hive.service.server.HiveServer2Instance;
+import org.apache.hive.service.server.TestHS2HttpServerPam;
import org.apache.hive.service.servlet.HS2Peers;
+import org.apache.http.HttpHeaders;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.AfterClass;
@@ -60,6 +64,8 @@
private MiniHS2 miniHS2_2 = null;
private static TestingServer zkServer;
private Connection hs2Conn = null;
+ private static String ADMIN_USER = "user1"; // user from TestPamAuthenticator
+ private static String ADMIN_PASSWORD = "1";
private static String zkHANamespace = "hs2ActivePassiveHATest";
private HiveConf hiveConf1;
private HiveConf hiveConf2;
@@ -124,15 +130,8 @@ private static void setHAConfigs(Configuration conf) {
public void testActivePassiveHA() throws Exception {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
- while (!miniHS2_1.isStarted()) {
- Thread.sleep(100);
- }
-
String instanceId2 = UUID.randomUUID().toString();
miniHS2_2.start(getConfOverlay(instanceId2));
- while (!miniHS2_2.isStarted()) {
- Thread.sleep(100);
- }
assertEquals(true, miniHS2_1.isLeader());
String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
@@ -175,9 +174,6 @@ public void testActivePassiveHA() throws Exception {
miniHS2_1.stop();
- while (!miniHS2_2.isStarted()) {
- Thread.sleep(100);
- }
assertEquals(true, miniHS2_2.isLeader());
url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("true", sendGet(url));
@@ -219,9 +215,6 @@ public void testActivePassiveHA() throws Exception {
instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
- while (!miniHS2_1.isStarted()) {
- Thread.sleep(100);
- }
assertEquals(false, miniHS2_1.isLeader());
url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("false", sendGet(url));
@@ -264,17 +257,11 @@ public void testActivePassiveHA() throws Exception {
public void testConnectionActivePassiveHAServiceDiscovery() throws Exception {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
- while (!miniHS2_1.isStarted()) {
- Thread.sleep(100);
- }
String instanceId2 = UUID.randomUUID().toString();
Map confOverlay = getConfOverlay(instanceId2);
confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
miniHS2_2.start(confOverlay);
- while (!miniHS2_2.isStarted()) {
- Thread.sleep(100);
- }
assertEquals(true, miniHS2_1.isLeader());
String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
@@ -323,6 +310,92 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception {
openConnectionAndRunQuery(zkJdbcUrl);
}
+ @Test(timeout = 60000)
+ public void testManualFailover() throws Exception {
+ setPamConfs(hiveConf1);
+ setPamConfs(hiveConf2);
+ PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
+ PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
+ try {
+ String instanceId1 = UUID.randomUUID().toString();
+ miniHS2_1.setPamAuthenticator(pamAuthenticator1);
+ miniHS2_1.start(getSecureConfOverlay(instanceId1));
+ String instanceId2 = UUID.randomUUID().toString();
+ Map confOverlay = getSecureConfOverlay(instanceId2);
+ confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
+ confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
+ miniHS2_2.setPamAuthenticator(pamAuthenticator2);
+ miniHS2_2.start(confOverlay);
+ String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+
+ // when we start miniHS2_1 will be leader (sequential start)
+ assertEquals(true, miniHS2_1.isLeader());
+ assertEquals("true", sendGet(url1, true));
+
+ // trigger failover on miniHS2_1
+ String resp = sendDelete(url1, true);
+ assertTrue(resp.contains("Failover successful!"));
+
+ // make sure miniHS2_1 is not leader
+ assertEquals(false, miniHS2_1.isLeader());
+ assertEquals("false", sendGet(url1, true));
+
+ // make sure miniHS2_2 is the new leader
+ assertEquals(true, miniHS2_2.isLeader());
+ assertEquals("true", sendGet(url2, true));
+
+ // send failover request again to miniHS2_1 and get a failure
+ resp = sendDelete(url1, true);
+ assertTrue(resp.contains("Cannot failover an instance that is not a leader"));
+ assertEquals(false, miniHS2_1.isLeader());
+
+ // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners)
+ resp = sendDelete(url2, true);
+ assertTrue(resp.contains("Failover successful!"));
+ assertEquals(true, miniHS2_1.isLeader());
+ assertEquals("true", sendGet(url1, true));
+ assertEquals("false", sendGet(url2, true));
+ assertEquals(false, miniHS2_2.isLeader());
+ } finally {
+ // revert configs to not affect other tests
+ unsetPamConfs(hiveConf1);
+ unsetPamConfs(hiveConf2);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testManualFailoverUnauthorized() throws Exception {
+ setPamConfs(hiveConf1);
+ PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
+ try {
+ String instanceId1 = UUID.randomUUID().toString();
+ miniHS2_1.setPamAuthenticator(pamAuthenticator1);
+ miniHS2_1.start(getSecureConfOverlay(instanceId1));
+
+ // dummy HS2 instance just to trigger failover
+ String instanceId2 = UUID.randomUUID().toString();
+ Map confOverlay = getSecureConfOverlay(instanceId2);
+ confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
+ confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
+ miniHS2_2.start(confOverlay);
+
+ String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ // when we start miniHS2_1 will be leader (sequential start)
+ assertEquals(true, miniHS2_1.isLeader());
+ assertEquals("true", sendGet(url1, true));
+
+ // trigger failover on miniHS2_1 without authorization header
+ assertEquals("Unauthorized", sendDelete(url1, false));
+ assertTrue(sendDelete(url1, true).contains("Failover successful!"));
+ assertEquals(false, miniHS2_1.isLeader());
+ assertEquals(true, miniHS2_2.isLeader());
+ } finally {
+ // revert configs to not affect other tests
+ unsetPamConfs(hiveConf1);
+ }
+ }
+
private Connection getConnection(String jdbcURL, String user) throws SQLException {
return DriverManager.getConnection(jdbcURL, user, "bar");
}
@@ -346,23 +419,62 @@ private void openConnectionAndRunQuery(String jdbcUrl) throws Exception {
}
private String sendGet(String url) throws Exception {
- URL obj = new URL(url);
- HttpURLConnection con = (HttpURLConnection) obj.openConnection();
- con.setRequestMethod("GET");
- BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
- String inputLine;
- StringBuilder response = new StringBuilder();
- while ((inputLine = in.readLine()) != null) {
- response.append(inputLine);
+ return sendGet(url, false);
+ }
+
+ private String sendGet(String url, boolean enableAuth) throws Exception {
+ return sendAuthMethod(new GetMethod(url), enableAuth);
+ }
+
+ private String sendDelete(String url, boolean enableAuth) throws Exception {
+ return sendAuthMethod(new DeleteMethod(url), enableAuth);
+ }
+
+ private String sendAuthMethod(HttpMethodBase method, boolean enableAuth) throws Exception {
+ HttpClient client = new HttpClient();
+ try {
+ if (enableAuth) {
+ String userPass = ADMIN_USER + ":" + ADMIN_PASSWORD;
+ method.addRequestHeader(HttpHeaders.AUTHORIZATION,
+ "Basic " + new String(Base64.getEncoder().encode(userPass.getBytes())));
+ }
+ int statusCode = client.executeMethod(method);
+ if (statusCode == 200) {
+ return method.getResponseBodyAsString();
+ } else {
+ return method.getStatusLine().getReasonPhrase();
+ }
+ } finally {
+ method.releaseConnection();
}
- in.close();
- return response.toString();
}
- private Map getConfOverlay(final String instanceId) {
+ private Map getConfOverlay(final String instanceId) {
Map confOverlay = new HashMap<>();
confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
return confOverlay;
}
+
+ private Map getSecureConfOverlay(final String instanceId) {
+ Map confOverlay = new HashMap<>();
+ confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
+ confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
+ confOverlay.put("hadoop.security.instrumentation.requires.admin", "true");
+ confOverlay.put("hadoop.security.authorization", "true");
+ confOverlay.put(ConfVars.USERS_IN_ADMIN_ROLE.varname, ADMIN_USER);
+ return confOverlay;
+ }
+
+ private void setPamConfs(final HiveConf hiveConf) {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, "sshd");
+ hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, true);
+ hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
+ }
+
+ private void unsetPamConfs(final HiveConf hiveConf) {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, "");
+ hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, false);
+ hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, false);
+ }
}
\ No newline at end of file
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index e1c2dd0..63d38e4 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim;
import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.http.security.PamAuthenticator;
import org.apache.hive.jdbc.Utils;
import org.apache.hive.service.Service;
import org.apache.hive.service.cli.CLIServiceClient;
@@ -76,6 +77,7 @@
private final boolean isMetastoreSecure;
private MiniClusterType miniClusterType = MiniClusterType.LOCALFS_ONLY;
private boolean usePortsFromConf = false;
+ private PamAuthenticator pamAuthenticator;
public enum MiniClusterType {
MR,
@@ -353,6 +355,9 @@ public void start(Map confOverlay) throws Exception {
for (int tryCount = 0; (tryCount < MetaStoreTestUtils.RETRY_COUNT); tryCount++) {
try {
hiveServer2 = new HiveServer2();
+ if (pamAuthenticator != null) {
+ hiveServer2.setPamAuthenticator(pamAuthenticator);
+ }
hiveServer2.init(getHiveConf());
hiveServer2.start();
hs2Started = true;
@@ -412,6 +417,10 @@ public boolean isLeader() {
return hiveServer2.isLeader();
}
+ public void setPamAuthenticator(final PamAuthenticator pamAuthenticator) {
+ this.pamAuthenticator = pamAuthenticator;
+ }
+
public CLIServiceClient getServiceClient() {
verifyStarted();
return getServiceClientInternal();
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
index 7c75489..f4b4362 100644
--- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -51,7 +51,7 @@
import com.google.common.base.Preconditions;
public class HS2ActivePassiveHARegistry extends ZkRegistryBase implements
- ServiceRegistry, HiveServer2HAInstanceSet {
+ ServiceRegistry, HiveServer2HAInstanceSet, HiveServer2.FailoverHandler {
private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class);
static final String ACTIVE_ENDPOINT = "activeEndpoint";
static final String PASSIVE_ENDPOINT = "passiveEndpoint";
@@ -60,6 +60,8 @@
private static final String INSTANCE_GROUP = "instances";
private static final String LEADER_LATCH_PATH = "/_LEADER";
private LeaderLatch leaderLatch;
+ private Map registeredListeners = new HashMap<>();
+ private String latchPath;
private ServiceRecord srv;
private boolean isClient;
private final String uniqueId;
@@ -80,7 +82,7 @@ static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) {
String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
String zkNameSpacePrefix = zkNameSpace + "-";
return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab,
- SASL_LOGIN_CONTEXT_NAME, conf, isClient);
+ isClient ? null : SASL_LOGIN_CONTEXT_NAME, conf, isClient);
}
private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix,
@@ -96,7 +98,8 @@ private HS2ActivePassiveHARegistry(final String instanceName, final String zkNam
} else {
this.uniqueId = UNIQUE_ID.toString();
}
- leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER);
+ this.latchPath = leaderLatchPath;
+ this.leaderLatch = getNewLeaderLatchPath();
}
@Override
@@ -105,7 +108,7 @@ public void start() throws IOException {
if (!isClient) {
this.srv = getNewServiceRecord();
register();
- leaderLatch.addListener(new HS2LeaderLatchListener());
+ registerLeaderLatchListener(new HS2LeaderLatchListener(), null);
try {
// all participating instances uses the same latch path, and curator randomly chooses one instance to be leader
// which can be verified via leaderLatch.hasLeadership()
@@ -205,6 +208,38 @@ private boolean hasLeadership() {
return leaderLatch.hasLeadership();
}
+ @Override
+ public void failover() throws Exception {
+ if (hasLeadership()) {
+ LOG.info("Failover request received for HS2 instance: {}. Restarting leader latch..", uniqueId);
+ leaderLatch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
+ leaderLatch = getNewLeaderLatchPath();
+ // re-attach all registered listeners
+ for (Map.Entry registeredListener : registeredListeners.entrySet()) {
+ if (registeredListener.getValue() == null) {
+ leaderLatch.addListener(registeredListener.getKey());
+ } else {
+ leaderLatch.addListener(registeredListener.getKey(), registeredListener.getValue());
+ }
+ }
+ leaderLatch.start();
+ LOG.info("Failover complete. Leader latch restarted successfully. New leader: {}",
+ leaderLatch.getLeader().getId());
+ } else {
+ LOG.warn("Failover request received for HS2 instance: {} that is not leader. Skipping..", uniqueId);
+ }
+ }
+
+ /**
+ * Returns a new instance of leader latch path but retains the same uniqueId. This is only used when HS2 startsup or
+ * when a manual failover is triggered (in which case uniqueId will still remain as the instance has not restarted)
+ *
+ * @return - new leader latch
+ */
+ private LeaderLatch getNewLeaderLatchPath() {
+ return new LeaderLatch(zooKeeperClient, latchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER);
+ }
+
private class HS2LeaderLatchListener implements LeaderLatchListener {
// leadership state changes and sending out notifications to listener happens inside synchronous method in curator.
@@ -283,7 +318,12 @@ public int size() {
* @param executorService - event handler executor service
*/
void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) {
- leaderLatch.addListener(latchListener, executorService);
+ registeredListeners.put(latchListener, executorService);
+ if (executorService == null) {
+ leaderLatch.addListener(latchListener);
+ } else {
+ leaderLatch.addListener(latchListener, executorService);
+ }
}
private Map getConfsToPublish() {
@@ -291,6 +331,9 @@ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final
// Hostname
confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname));
+ // Web port
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname));
// Hostname:port
confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG));
confsToPublish.put(UNIQUE_IDENTIFIER, uniqueId);
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index bb92c44..90ba752 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,9 +18,15 @@
package org.apache.hive.service.server;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +45,9 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodBase;
+import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
@@ -87,12 +96,14 @@
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.apache.hive.service.servlet.HS2LeadershipStatus;
import org.apache.hive.service.servlet.HS2Peers;
import org.apache.hive.service.servlet.QueryProfileServlet;
+import org.apache.http.HttpHeaders;
import org.apache.logging.log4j.util.Strings;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -150,6 +161,10 @@ public HiveServer2(PamAuthenticator pamAuthenticator) {
this.pamAuthenticator = pamAuthenticator;
}
+ @VisibleForTesting
+ public void setPamAuthenticator(PamAuthenticator pamAuthenticator) {
+ this.pamAuthenticator = pamAuthenticator;
+ }
@Override
public synchronized void init(HiveConf hiveConf) {
@@ -222,6 +237,22 @@ public void run() {
this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);
this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE);
+ try {
+ if (serviceDiscovery) {
+ serviceUri = getServerInstanceURI();
+ addConfsToPublish(hiveConf, confsToPublish, serviceUri);
+ if (activePassiveHA) {
+ hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
+ leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
+ leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Leader Actions Handler Thread").build());
+ hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
+ }
+ }
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+
// Setup web UI
final int webUIPort;
final String webHost;
@@ -295,6 +326,7 @@ public void run() {
}
if (serviceDiscovery && activePassiveHA) {
builder.setContextAttribute("hs2.isLeader", isLeader);
+ builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry));
builder.setContextAttribute("hiveconf", hiveConf);
builder.addServlet("leader", HS2LeadershipStatus.class);
builder.addServlet("peers", HS2Peers.class);
@@ -311,22 +343,6 @@ public void run() {
throw new ServiceException(ie);
}
- try {
- if (serviceDiscovery) {
- serviceUri = getServerInstanceURI();
- addConfsToPublish(hiveConf, confsToPublish, serviceUri);
- if (activePassiveHA) {
- hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
- leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
- leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Leader Actions Handler Thread").build());
- hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
- }
- }
- } catch (Exception e) {
- throw new ServiceException(e);
- }
-
// Add a shutdown hook for catching SIGTERM & SIGINT
ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop());
}
@@ -532,7 +548,22 @@ public boolean isLeader() {
return isLeader.get();
}
+ interface FailoverHandler {
+ void failover() throws Exception;
+ }
+
+ public static class FailoverHandlerCallback implements FailoverHandler {
+ private HS2ActivePassiveHARegistry hs2HARegistry;
+
+ FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) {
+ this.hs2HARegistry = hs2HARegistry;
+ }
+ @Override
+ public void failover() throws Exception {
+ hs2HARegistry.failover();
+ }
+ }
/**
* The watcher class which sets the de-register flag when the znode corresponding to this server
* instance is deleted. Additionally, it shuts down the server if there are no more active client
@@ -663,6 +694,9 @@ public void isLeader() {
public void notLeader() {
LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(false);
+ // TODO: should we explicitly close client connections with appropriate error msg? SessionManager.closeSession()
+ // will shut itself down upon explicit --deregister after all connections are closed. Something similar but for
+ // failover.
hiveServer2.stopOrDisconnectTezSessions();
LOG.info("Stopped/Disconnected tez sessions.");
}
@@ -998,6 +1032,19 @@ public static void main(String[] args) {
.withLongOpt("deregister")
.withDescription("Deregister all instances of given version from dynamic service discovery")
.create());
+ // --listHAPeers
+ options.addOption(OptionBuilder
+ .hasArgs(0)
+ .withLongOpt("listHAPeers")
+ .withDescription("List all HS2 instances when running in Active Passive HA mode")
+ .create());
+ // --failover
+ options.addOption(OptionBuilder
+ .hasArgs(1)
+ .withArgName("workerIdentity")
+ .withLongOpt("failover")
+ .withDescription("Manually failover Active HS2 instance to passive standby mode")
+ .create());
options.addOption(new Option("H", "help", false, "Print help information"));
}
@@ -1027,6 +1074,18 @@ ServerOptionsProcessorResponse parse(String[] argv) {
return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor(
commandLine.getOptionValue("deregister")));
}
+
+ // Process --listHAPeers
+ if (commandLine.hasOption("listHAPeers")) {
+ return new ServerOptionsProcessorResponse(new ListHAPeersExecutor());
+ }
+
+ // Process --failover
+ if (commandLine.hasOption("failover")) {
+ return new ServerOptionsProcessorResponse(new FailoverHS2InstanceExecutor(
+ commandLine.getOptionValue("failover")
+ ));
+ }
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments: ");
@@ -1124,4 +1183,112 @@ public void execute() {
System.exit(0);
}
}
+
+ /**
+ * Handler for --failover command. The way failover works is,
+ * - the client gets from user input
+ * - the client uses HS2 HA registry to get list of HS2 instances and finds the one that matches
+ * - if there is a match, client makes sure the instance is a leader (only leader can failover)
+ * - if the matched instance is a leader, its web endpoint is obtained from service record then http DELETE method
+ * is invoked on /leader endpoint (Yes. Manual failover requires web UI to be enabled)
+ * - the webpoint checks if admin ACLs are set, if so will close and restart the leader latch triggering a failover
+ */
+ static class FailoverHS2InstanceExecutor implements ServerOptionsExecutor {
+ private final String workerIdentity;
+
+ FailoverHS2InstanceExecutor(String workerIdentity) {
+ this.workerIdentity = workerIdentity;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ HiveConf hiveConf = new HiveConf();
+ HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
+ Collection hs2Instances = haRegistry.getAll();
+ // no HS2 instances are running
+ if (hs2Instances.isEmpty()) {
+ LOG.error("No HiveServer2 instances are running in HA mode");
+ System.err.println("No HiveServer2 instances are running in HA mode");
+ System.exit(-1);
+ }
+ HiveServer2Instance targetInstance = null;
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.getWorkerIdentity().equals(workerIdentity)) {
+ targetInstance = instance;
+ break;
+ }
+ }
+ // no match for workerIdentity
+ if (targetInstance == null) {
+ LOG.error("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity);
+ System.err.println("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity);
+ System.exit(-1);
+ }
+ // only one HS2 instance available (cannot failover)
+ if (hs2Instances.size() == 1) {
+ LOG.error("Only one HiveServer2 instance running in thefail cluster. Cannot failover: " + workerIdentity);
+ System.err.println("Only one HiveServer2 instance running in the cluster. Cannot failover: " + workerIdentity);
+ System.exit(-1);
+ }
+ // matched HS2 instance is not leader
+ if (!targetInstance.isLeader()) {
+ LOG.error("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover");
+ System.err.println("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover");
+ System.exit(-1);
+ }
+
+ String webPort = targetInstance.getProperties().get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname);
+ // web port cannot be obtained
+ if (StringUtils.isEmpty(webPort)) {
+ LOG.error("Unable to determine web port for instance: " + workerIdentity);
+ System.err.println("Unable to determine web port for instance: " + workerIdentity);
+ System.exit(-1);
+ }
+
+ // invoke DELETE /leader endpoint for failover
+ String webEndpoint = "http://" + targetInstance.getHost() + ":" + webPort + "/leader";
+ HttpClient client = new HttpClient();
+ HttpMethodBase method = new DeleteMethod(webEndpoint);
+ try {
+ int statusCode = client.executeMethod(method);
+ if (statusCode == 200) {
+ System.out.println(method.getResponseBodyAsString());
+ } else {
+ String response = method.getStatusLine().getReasonPhrase();
+ LOG.error("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " +
+ statusCode + "error: " + response);
+ System.err.println("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " +
+ statusCode + " error: " + response);
+ System.exit(-1);
+ }
+ } finally {
+ method.releaseConnection();
+ }
+ } catch (IOException e) {
+ LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e);
+ System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e);
+ System.exit(-1);
+ }
+ System.exit(0);
+ }
+ }
+
+ static class ListHAPeersExecutor implements ServerOptionsExecutor {
+ @Override
+ public void execute() {
+ try {
+ HiveConf hiveConf = new HiveConf();
+ HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
+ HS2Peers.HS2Instances hs2Instances = new HS2Peers.HS2Instances(haRegistry.getAll());
+ String jsonOut = hs2Instances.toJson();
+ System.out.println(jsonOut);
+ } catch (IOException e) {
+ LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e);
+ System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e);
+ System.exit(-1);
+ }
+ System.exit(0);
+ }
+ }
}
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
index 33529ed..708fa0c 100644
--- a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
+++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
@@ -17,6 +17,8 @@
*/
package org.apache.hive.service.servlet;
+import static org.apache.hive.http.HttpServer.CONF_CONTEXT_ATTRIBUTE;
+
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,18 +27,36 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hive.http.HttpServer;
+import org.apache.hive.service.server.HiveServer2;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Returns "true" if this HS2 instance is leader else "false".
+ * Invoking a "DELETE" method on this endpoint will trigger a failover if this instance is a leader.
+ * hadoop.security.instrumentation.requires.admin should be set to true and current user has to be in admin ACLS
+ * for accessing any of these endpoints.
*/
public class HS2LeadershipStatus extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class);
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ // admin check -
+ // allows when hadoop.security.instrumentation.requires.admin is set to false
+ // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization
+ // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list
+ final ServletContext context = getServletContext();
+ if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) {
+ LOG.warn("Unauthorized to perform GET action. remoteUser: {}", request.getRemoteUser());
+ return;
+ }
+
ServletContext ctx = getServletContext();
AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader");
LOG.info("Returning isLeader: {}", isLeader);
@@ -45,4 +65,73 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
}
+
+ private class FailoverResponse {
+ private boolean success;
+ private String message;
+
+ FailoverResponse() { }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(final boolean success) {
+ this.success = success;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(final String message) {
+ this.message = message;
+ }
+ }
+
+ @Override
+ public void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
+ // strict admin check -
+ // allows ONLY if hadoop.security.instrumentation.requires.admin is set to true
+ // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization
+ // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list
+ final ServletContext context = getServletContext();
+ if (!HttpServer.isInstrumentationAccessAllowedStrict(context, request, response)) {
+ LOG.warn("Unauthorized to perform DELETE action. remoteUser: {}", request.getRemoteUser());
+ return;
+ }
+
+ LOG.info("DELETE handler invoked for failover..");
+ ObjectMapper mapper = new ObjectMapper();
+ FailoverResponse failoverResponse = new FailoverResponse();
+ AtomicBoolean isLeader = (AtomicBoolean) context.getAttribute("hs2.isLeader");
+ if (!isLeader.get()) {
+ String msg = "Cannot failover an instance that is not a leader";
+ LOG.info(msg);
+ failoverResponse.setSuccess(false);
+ failoverResponse.setMessage(msg);
+ mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse);
+ response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+ return;
+ }
+
+ HiveServer2.FailoverHandlerCallback failoverHandler = (HiveServer2.FailoverHandlerCallback) context
+ .getAttribute("hs2.failover.callback");
+ try {
+ String msg = "Failover successful!";
+ LOG.info(msg);
+ failoverHandler.failover();
+ failoverResponse.setSuccess(true);
+ failoverResponse.setMessage(msg);
+ mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse);
+ response.setStatus(HttpServletResponse.SC_OK);
+ } catch (Exception e) {
+ String errMsg = "Cannot perform failover of HS2 instance. err: " + e.getMessage();
+ LOG.error(errMsg, e);
+ failoverResponse.setSuccess(false);
+ failoverResponse.setMessage(errMsg);
+ mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
}
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
index a51bbeb..bde6d6b 100644
--- a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
+++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
@@ -26,17 +26,22 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hive.http.HttpServer;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
import org.apache.hive.service.server.HiveServer2Instance;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Returns all HS2 instances in Active-Passive standy modes.
*/
public class HS2Peers extends HttpServlet {
+ private static final Logger LOG = LoggerFactory.getLogger(HS2Peers.class);
public static class HS2Instances {
private Collection hiveServer2Instances;
@@ -55,20 +60,32 @@ public HS2Instances(final Collection hiveServer2Instances)
public void setHiveServer2Instances(final Collection hiveServer2Instances) {
this.hiveServer2Instances = hiveServer2Instances;
}
+
+ @JsonIgnore
+ public String toJson() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+ return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this);
+ }
}
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ // admin check -
+ // allows when hadoop.security.instrumentation.requires.admin is set to false
+ // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization
+ // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list
+ final ServletContext context = getServletContext();
+ if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) {
+ LOG.warn("Unauthorized to perform GET action. remoteUser: {}", request.getRemoteUser());
+ return;
+ }
+
ServletContext ctx = getServletContext();
HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf");
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
- // serialize json based on field annotations only
- mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker()
- .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
HS2Instances instances = new HS2Instances(hs2Registry.getAll());
- mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances);
+ response.getWriter().write(instances.toJson());
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
}
diff --git a/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java b/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java
index d1b3ce0..04f66b4 100644
--- a/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java
+++ b/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java
@@ -155,7 +155,7 @@ public void testIncorrectPassword() throws Exception {
public static class TestPamAuthenticator extends PamAuthenticator {
private static final Map users = new HashMap<>();
- TestPamAuthenticator(HiveConf conf) throws AuthenticationException {
+ public TestPamAuthenticator(HiveConf conf) throws AuthenticationException {
super(conf);
}