diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 352744f..4ee4681 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -149,16 +149,6 @@ public HiveConnection(String uri, Properties info) throws SQLException { fetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE)); } - if (isEmbeddedMode) { - EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService(); - embeddedClient.init(null); - client = embeddedClient; - } else { - // open the client transport - openTransport(); - // set up the client - client = new TCLIService.Client(new TBinaryProtocol(transport)); - } // add supported protocols supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2); @@ -169,65 +159,75 @@ public HiveConnection(String uri, Properties info) throws SQLException { supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8); - // open client session - openSession(); + if (isEmbeddedMode) { + EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService(); + embeddedClient.init(null); + client = embeddedClient; + openSession(); + } else { + int numRetries = 0; + int maxRetries = 1; + try { + maxRetries = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.RETRIES)); + } catch(NumberFormatException e) { + } + do { + try { + // open the client transport + openTransport(); + // set up the client + client = new TCLIService.Client(new TBinaryProtocol(transport)); + // open client session + openSession(); + break; + } catch(Exception e) { + // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper + if (isZkDynamicDiscoveryMode()) { + LOG.info("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); + try { + // Update jdbcUriString, host & port variables in connParams + // Throw an exception if all HiveServer2 nodes have been exhausted, + // or if we're unable to connect to ZooKeeper. + Utils.updateConnParamsFromZooKeeper(connParams); + } catch (ZooKeeperHiveClientException ze) { + throw new SQLException( + "Could not open client transport for any of the Server URI's in ZooKeeper: " + + ze.getMessage(), " 08S01", ze); + } + // Update with new values + jdbcUriString = connParams.getJdbcUriString(); + host = connParams.getHost(); + port = connParams.getPort(); + } else { + LOG.info("Transport Used for JDBC connection: " + + sessConfMap.get(JdbcConnectionParams.TRANSPORT_MODE)); + + // Retry maxRetries times + String errMsg = "Could not open client transport with JDBC Uri: " + + jdbcUriString + ": " + e.getMessage(); + if (++numRetries >= maxRetries) { + throw new SQLException(errMsg, " 08S01", e); + } else { + LOG.warn(errMsg + " Retrying " + numRetries + " of " + maxRetries); + } + } + } + } while(true); + } // Wrap the client with a thread-safe proxy to serialize the RPC calls client = newSynchronizedClient(client); } - private void openTransport() throws SQLException { - int numRetries = 0; - int maxRetries = 1; - try { - maxRetries = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.RETRIES)); - } catch(NumberFormatException e) { - } - - while (true) { - try { - assumeSubject = - JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap - .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); - transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); - if (!transport.isOpen()) { - transport.open(); - logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort()); - } - break; - } catch (TTransportException e) { - // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper - if (isZkDynamicDiscoveryMode()) { - LOG.info("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); - try { - // Update jdbcUriString, host & port variables in connParams - // Throw an exception if all HiveServer2 nodes have been exhausted, - // or if we're unable to connect to ZooKeeper. - Utils.updateConnParamsFromZooKeeper(connParams); - } catch (ZooKeeperHiveClientException ze) { - throw new SQLException( - "Could not open client transport for any of the Server URI's in ZooKeeper: " - + ze.getMessage(), " 08S01", ze); - } - // Update with new values - jdbcUriString = connParams.getJdbcUriString(); - host = connParams.getHost(); - port = connParams.getPort(); - } else { - LOG.info("Transport Used for JDBC connection: " + - sessConfMap.get(JdbcConnectionParams.TRANSPORT_MODE)); - - // Retry maxRetries times - String errMsg = "Could not open client transport with JDBC Uri: " + - jdbcUriString + ": " + e.getMessage(); - if (++numRetries >= maxRetries) { - throw new SQLException(errMsg, " 08S01", e); - } else { - LOG.warn(errMsg + " Retrying " + numRetries + " of " + maxRetries); - } - } + private void openTransport() throws SQLException, TTransportException { + assumeSubject = + JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap + .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); + transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); + if (!transport.isOpen()) { + transport.open(); + logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort()); } - } } public String getConnectedUrl() { @@ -254,26 +254,7 @@ private TTransport createHttpTransport() throws SQLException, TTransportExceptio boolean useSsl = isSslConnection(); // Create an http client from the configs httpClient = getHttpClient(useSsl); - try { - transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); - // We'll call an open/close here to send a test HTTP message to the server. Any - // TTransportException caused by trying to connect to a non-available peer are thrown here. - // Bubbling them up the call hierarchy so that a retry can happen in openTransport, - // if dynamic service discovery is configured. - TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport)); - TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq()); - if (openResp != null) { - client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle())); - } - } - catch (TException e) { - LOG.info("JDBC Connection Parameters used : useSSL = " + useSsl + " , httpPath = " + - sessConfMap.get(JdbcConnectionParams.HTTP_PATH) + " Authentication type = " + - sessConfMap.get(JdbcConnectionParams.AUTH_TYPE)); - String msg = "Could not create http connection to " + - jdbcUriString + ". " + e.getMessage(); - throw new TTransportException(msg, e); - } + transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); return transport; }