diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index 26acbd7..72b2a8c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -19,12 +19,17 @@ package org.apache.hive.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -34,6 +39,7 @@ import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; @@ -54,8 +60,10 @@ private MiniHS2 miniHS2_2 = null; private static TestingServer zkServer; private Connection hs2Conn = null; + private static String zkHANamespace = "hs2ActivePassiveHATest"; private HiveConf hiveConf1; private HiveConf hiveConf2; + private static Path kvDataFilePath; @BeforeClass public static void beforeTest() throws Exception { @@ -85,6 +93,8 @@ public void setUp() throws Exception { // Set up zookeeper dynamic service discovery configs setHAConfigs(hiveConf2); miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + final String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); } @After @@ -103,26 +113,24 @@ public void tearDown() throws Exception { private static void setHAConfigs(Configuration conf) { conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true); conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString()); - final String zkRootNamespace = "hs2test"; - conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace); conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true); + conf.set(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname, zkHANamespace); conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS); conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS); conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1); } @Test(timeout = 60000) - public void testActivePassive() throws Exception { - Map confOverlay = new HashMap<>(); - hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); - miniHS2_1.start(confOverlay); - while(!miniHS2_1.isStarted()) { + public void testActivePassiveHA() throws Exception { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + while (!miniHS2_1.isStarted()) { Thread.sleep(100); } - hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); - miniHS2_2.start(confOverlay); - while(!miniHS2_2.isStarted()) { + String instanceId2 = UUID.randomUUID().toString(); + miniHS2_2.start(getConfOverlay(instanceId2)); + while (!miniHS2_2.isStarted()) { Thread.sleep(100); } @@ -141,7 +149,7 @@ public void testActivePassive() throws Exception { int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); assertEquals(2, hs2Peers.getHiveServer2Instances().size()); for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { - if (hsi.getRpcPort() == port1) { + if (hsi.getRpcPort() == port1 && hsi.getWorkerIdentity().equals(instanceId1)) { assertEquals(true, hsi.isLeader()); } else { assertEquals(false, hsi.isLeader()); @@ -167,7 +175,7 @@ public void testActivePassive() throws Exception { miniHS2_1.stop(); - while(!miniHS2_2.isStarted()) { + while (!miniHS2_2.isStarted()) { Thread.sleep(100); } assertEquals(true, miniHS2_2.isLeader()); @@ -200,7 +208,7 @@ public void testActivePassive() throws Exception { int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); assertEquals(1, hs2Peers.getHiveServer2Instances().size()); for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { - if (hsi.getRpcPort() == port2) { + if (hsi.getRpcPort() == port2 && hsi.getWorkerIdentity().equals(instanceId2)) { assertEquals(true, hsi.isLeader()); } else { assertEquals(false, hsi.isLeader()); @@ -208,10 +216,10 @@ public void testActivePassive() throws Exception { } // start 1st server again - hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); - miniHS2_1.start(confOverlay); + instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); - while(!miniHS2_1.isStarted()) { + while (!miniHS2_1.isStarted()) { Thread.sleep(100); } assertEquals(false, miniHS2_1.isLeader()); @@ -244,7 +252,7 @@ public void testActivePassive() throws Exception { port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); assertEquals(2, hs2Peers.getHiveServer2Instances().size()); for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { - if (hsi.getRpcPort() == port2) { + if (hsi.getRpcPort() == port2 && hsi.getWorkerIdentity().equals(instanceId2)) { assertEquals(true, hsi.isLeader()); } else { assertEquals(false, hsi.isLeader()); @@ -252,6 +260,91 @@ public void testActivePassive() throws Exception { } } + @Test(timeout = 60000) + public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + while (!miniHS2_1.isStarted()) { + Thread.sleep(100); + } + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.start(confOverlay); + while (!miniHS2_2.isStarted()) { + Thread.sleep(100); + } + + assertEquals(true, miniHS2_1.isLeader()); + String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + assertEquals(false, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + // miniHS2_1 will be leader + String zkConnectString = zkServer.getConnectString(); + String zkJdbcUrl = miniHS2_1.getJdbcURL(); + // getAllUrls will parse zkJdbcUrl and will plugin the active HS2's host:port + String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + final String serviceDiscoveryMode = "zooKeeperHA"; + String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + assertTrue(zkJdbcUrl.contains(zkConnectString)); + assertEquals(hs2_1_directUrl, parsedUrl); + openConnectionAndRunQuery(zkJdbcUrl); + + // miniHS2_2 will become leader + miniHS2_1.stop(); + parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + assertTrue(zkJdbcUrl.contains(zkConnectString)); + assertEquals(hs2_2_directUrl, parsedUrl); + openConnectionAndRunQuery(zkJdbcUrl); + + // miniHS2_2 will continue to be leader + instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + assertTrue(zkJdbcUrl.contains(zkConnectString)); + assertEquals(hs2_2_directUrl, parsedUrl); + openConnectionAndRunQuery(zkJdbcUrl); + + // miniHS2_1 will become leader + miniHS2_2.stop(); + parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + assertTrue(zkJdbcUrl.contains(zkConnectString)); + assertEquals(hs2_1_directUrl, parsedUrl); + openConnectionAndRunQuery(zkJdbcUrl); + } + + private Connection getConnection(String jdbcURL, String user) throws SQLException { + return DriverManager.getConnection(jdbcURL, user, "bar"); + } + + private void openConnectionAndRunQuery(String jdbcUrl) throws Exception { + hs2Conn = getConnection(jdbcUrl, System.getProperty("user.name")); + String tableName = "testTab1"; + Statement stmt = hs2Conn.createStatement(); + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + // load data + stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + + tableName); + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(res.next()); + assertEquals("val_238", res.getString(2)); + res.close(); + stmt.close(); + } + private String sendGet(String url) throws Exception { URL obj = new URL(url); HttpURLConnection con = (HttpURLConnection) obj.openConnection(); @@ -265,4 +358,11 @@ private String sendGet(String url) throws Exception { in.close(); return response.toString(); } + + private Map getConfOverlay(final String instanceId) { + Map confOverlay = new HashMap<>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); + confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); + return confOverlay; + } } \ No newline at end of file diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 997726c..e1c2dd0 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.jdbc.Utils; import org.apache.hive.service.Service; import org.apache.hive.service.cli.CLIServiceClient; import org.apache.hive.service.cli.SessionHandle; @@ -490,10 +491,14 @@ public String getJdbcURL(String dbName, String sessionConfExt, String hiveConfEx } String baseJdbcURL; if (isDynamicServiceDiscovery()) { - sessionConfExt = - "serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=" - + getServerConf().getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE) + ";" - + sessionConfExt; + String namespace = getServerConf().getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + String serviceDiscoveryMode = Utils.JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER; + if (HiveConf.getBoolVar(getServerConf(), ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) { + namespace = getServerConf().getVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE); + serviceDiscoveryMode = Utils.JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA; + } + sessionConfExt = "serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + + namespace + ";" + sessionConfExt; baseJdbcURL = getZKBaseJdbcURL(); } else { baseJdbcURL = getBaseJdbcURL(); diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index c3f9c63..30b6daf 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -149,7 +149,10 @@ */ public static List getAllUrls(String zookeeperBasedHS2Url) throws Exception { JdbcConnectionParams params = Utils.parseURL(zookeeperBasedHS2Url, new Properties()); - if (params.getZooKeeperEnsemble() == null) { + // if zk is disabled or if HA service discovery is enabled we return the already populated params. + // in HA mode, params is already populated with Active server host info. + if (params.getZooKeeperEnsemble() == null || + ZooKeeperHiveClientHelper.isZkHADynamicDiscoveryMode(params.getSessionVars())) { return Collections.singletonList(params); } return ZooKeeperHiveClientHelper.getDirectParamsList(params); @@ -230,7 +233,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); String errMsg = null; String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": "; - if (isZkDynamicDiscoveryMode()) { + if (ZooKeeperHiveClientHelper.isZkDynamicDiscoveryMode(sessConfMap)) { errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: "; // Try next available server in zookeeper, or retry all the servers again if retry is enabled while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) { @@ -764,14 +767,8 @@ private boolean isHttpTransportMode() { return false; } - private boolean isZkDynamicDiscoveryMode() { - return (sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null) - && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap - .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE))); - } - private void logZkDiscoveryMessage(String message) { - if (isZkDynamicDiscoveryMode()) { + if (ZooKeeperHiveClientHelper.isZkDynamicDiscoveryMode(sessConfMap)) { LOG.info(message); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 9a12977..6d7787d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -108,11 +108,13 @@ // Don't use dynamic service discovery static final String SERVICE_DISCOVERY_MODE_NONE = "none"; // Use ZooKeeper for indirection while using dynamic service discovery - static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper"; + public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper"; + public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA"; static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace"; // Default namespace value on ZooKeeper. // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri. static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2"; + static final String ZOOKEEPER_ACTIVE_PASSIVE_HA_DEFAULT_NAMESPACE = "hs2ActivePassiveHA"; static final String COOKIE_AUTH = "cookieAuth"; static final String COOKIE_AUTH_FALSE = "false"; static final String COOKIE_NAME = "cookieName"; @@ -537,11 +539,7 @@ private static String getAuthorities(String uri, JdbcConnectionParams connParams private static void configureConnParams(JdbcConnectionParams connParams) throws JdbcUriParseException, ZooKeeperHiveClientException { - String serviceDiscoveryMode = - connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE); - if ((serviceDiscoveryMode != null) - && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER - .equalsIgnoreCase(serviceDiscoveryMode))) { + if (ZooKeeperHiveClientHelper.isZkDynamicDiscoveryMode(connParams.getSessionVars())) { // Set ZooKeeper ensemble in connParams for later use connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ",")); // Configure using ZooKeeper diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index bc91f0c..0468f7b 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -21,39 +21,69 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; -import org.apache.zookeeper.Watcher; +import org.apache.hive.service.server.HS2ActivePassiveHARegistry; +import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; +import org.apache.hive.service.server.HiveServer2Instance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; + class ZooKeeperHiveClientHelper { static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveClientHelper.class.getName()); // Pattern for key1=value1;key2=value2 private static final Pattern kvPattern = Pattern.compile("([^=;]*)=([^;]*)[;]?"); - /** - * A no-op watcher class - */ - static class DummyWatcher implements Watcher { - @Override - public void process(org.apache.zookeeper.WatchedEvent event) { - } - } private static String getZooKeeperNamespace(JdbcConnectionParams connParams) { String zooKeeperNamespace = connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE); if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) { - zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE; + // if active passive HA enabled, use default HA namespace + if (isZkHADynamicDiscoveryMode(connParams.getSessionVars())) { + zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_ACTIVE_PASSIVE_HA_DEFAULT_NAMESPACE; + } else { + zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE; + } } return zooKeeperNamespace; } + /** + * Returns true is only if HA service discovery mode is enabled + * + * @param sessionConf - session configuration + * @return true if serviceDiscoveryMode=zooKeeperHA is specified in JDBC URI + */ + public static boolean isZkHADynamicDiscoveryMode(Map sessionConf) { + final String discoveryMode = sessionConf.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE); + return (discoveryMode != null) && + JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode); + } + + /** + * Returns true is any service discovery mode is enabled (HA or non-HA) + * + * @param sessionConf - session configuration + * @return true if serviceDiscoveryMode is specified in JDBC URI + */ + public static boolean isZkDynamicDiscoveryMode(Map sessionConf) { + final String discoveryMode = sessionConf.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE); + return (discoveryMode != null) + && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(discoveryMode) || + JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode)); + } + private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception { String zooKeeperEnsemble = connParams.getZooKeeperEnsemble(); CuratorFramework zooKeeperClient = @@ -103,20 +133,71 @@ private static void updateParamsWithZKServerNode(JdbcConnectionParams connParams } static void configureConnParams(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException { - CuratorFramework zooKeeperClient = null; + if (isZkHADynamicDiscoveryMode(connParams.getSessionVars())) { + configureConnParamsHA(connParams); + } else { + CuratorFramework zooKeeperClient = null; + try { + zooKeeperClient = getZkClient(connParams); + List serverHosts = getServerHosts(connParams, zooKeeperClient); + // Now pick a server node randomly + String serverNode = serverHosts.get(new Random().nextInt(serverHosts.size())); + updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode); + } catch (Exception e) { + throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e); + } finally { + // Close the client connection with ZooKeeper + if (zooKeeperClient != null) { + zooKeeperClient.close(); + } + } + } + } + + private static void configureConnParamsHA(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException { try { - zooKeeperClient = getZkClient(connParams); - List serverHosts = getServerHosts(connParams, zooKeeperClient); - // Now pick a server node randomly - String serverNode = serverHosts.get(new Random().nextInt(serverHosts.size())); - updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode); + Configuration registryConf = new Configuration(); + registryConf.set(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, connParams.getZooKeeperEnsemble()); + registryConf.set(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname, + getZooKeeperNamespace(connParams)); + HS2ActivePassiveHARegistry haRegistryClient = HS2ActivePassiveHARegistryClient.getClient(registryConf); + boolean foundLeader = false; + String maxRetriesConf = connParams.getSessionVars().get(JdbcConnectionParams.RETRIES); + final int maxRetries = StringUtils.isEmpty(maxRetriesConf) ? 5 : Integer.parseInt(maxRetriesConf); + int retries = 0; + int sleepMs = 1000; + while (!foundLeader && retries < maxRetries) { + for (HiveServer2Instance hiveServer2Instance : haRegistryClient.getAll()) { + if (hiveServer2Instance.isLeader()) { + foundLeader = true; + connParams.setHost(hiveServer2Instance.getHost()); + connParams.setPort(hiveServer2Instance.getRpcPort()); + final String mode = hiveServer2Instance.getTransportMode().equals("http") ? "http:/" + hiveServer2Instance + .getHttpEndpoint() : hiveServer2Instance.getTransportMode(); + LOG.info("Found HS2 Active Host: {} Port: {} Identity: {} Mode: {}", hiveServer2Instance.getHost(), + hiveServer2Instance.getRpcPort(), hiveServer2Instance.getWorkerIdentity(), mode); + // configurations are always published to ServiceRecord. Read/apply configs to JDBC connection params + String serverConfStr = Joiner.on(';').withKeyValueSeparator("=").join(hiveServer2Instance.getProperties()); + if (LOG.isDebugEnabled()) { + LOG.debug("Configurations applied to JDBC connection params. {}", hiveServer2Instance.getProperties()); + } + applyConfs(serverConfStr, connParams); + break; + } + } + if (!foundLeader) { + LOG.warn("Unable to connect to HS2 Active Host (No Leader Found!). Retrying after {} ms. retries: {}", + sleepMs, retries); + Thread.sleep(sleepMs); + retries++; + } + } + if (!foundLeader) { + throw new ZooKeeperHiveClientException("Unable to connect to HiveServer2 Active host (No leader found!) after" + + " " + maxRetries + " retries."); + } } catch (Exception e) { throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e); - } finally { - // Close the client connection with ZooKeeper - if (zooKeeperClient != null) { - zooKeeperClient.close(); - } } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index e7227a8..680d9af 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -220,6 +220,9 @@ private CuratorFramework getZookeeperClient(Configuration conf, String namespace ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); + LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" + + " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout, + connectionTimeout, namespace, baseSleepTime, maxRetries); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs return CuratorFrameworkFactory.builder() @@ -270,8 +273,12 @@ private static String getQuorumServers(Configuration conf) { protected abstract String getZkPathUser(Configuration conf); protected final String registerServiceRecord(ServiceRecord srv) throws IOException { + return registerServiceRecord(srv, UNIQUE_ID.toString()); + } + + protected final String registerServiceRecord(ServiceRecord srv, final String uniqueId) throws IOException { // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + srv.set(UNIQUE_IDENTIFIER, uniqueId); // Create a znode under the rootNamespace parent for this instance of the server try { @@ -308,7 +315,7 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti CloseableUtils.closeQuietly(znode); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - return UNIQUE_ID.toString(); + return uniqueId; } protected final void updateServiceRecord(ServiceRecord srv) throws IOException { diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 6514d98..819ce19 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -62,6 +62,7 @@ private LeaderLatch leaderLatch; private ServiceRecord srv; private boolean isClient; + private final String uniqueId; // There are 2 paths under which the instances get registered // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata) @@ -89,8 +90,13 @@ private HS2ActivePassiveHARegistry(final String instanceName, final String zkNam super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP, saslContextName, krbPrincipal, krbKeytab, null); this.isClient = isClient; - leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString(), - LeaderLatch.CloseMode.NOTIFY_LEADER); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) && + conf.get(ZkRegistryBase.UNIQUE_IDENTIFIER) != null) { + this.uniqueId = conf.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + } else { + this.uniqueId = UNIQUE_ID.toString(); + } + leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); } @Override @@ -110,6 +116,7 @@ public void start() throws IOException { LOG.info("Registered HS2 with ZK. service record: {}", srv); } else { populateCache(); + LOG.info("Populating instances cache for client"); } } @@ -121,7 +128,7 @@ protected void unregisterInternal() { @Override public String register() throws IOException { updateEndpoint(srv, PASSIVE_ENDPOINT); - return registerServiceRecord(srv); + return registerServiceRecord(srv, uniqueId); } @Override @@ -285,7 +292,7 @@ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); // Hostname:port confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); - confsToPublish.put(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + confsToPublish.put(UNIQUE_IDENTIFIER, uniqueId); // Transport mode confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname)); diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java index 512d4e8..f87b610 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java @@ -46,8 +46,9 @@ public static synchronized HS2ActivePassiveHARegistry getClient(Configuration co registry = HS2ActivePassiveHARegistry.create(conf, true); registry.start(); hs2Registries.put(nsKey, registry); + LOG.info("Added registry client to cache with namespace: {}", nsKey); } else { - LOG.debug("Returning cached registry client for nsKey: {}", nsKey); + LOG.info("Returning cached registry client for namespace: {}", nsKey); } return registry; } diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java index 921a23e..33529ed 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; /** - * Returns 200 if this HS2 instance is leader. + * Returns "true" if this HS2 instance is leader else "false". */ public class HS2LeadershipStatus extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class);