diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java index a81c600236e6e0f53a484806e559303279244616..14191e56d97fc6703be221aa41e7d76af26dcca0 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java @@ -71,9 +71,11 @@ public static class CLIServiceClientWrapper extends CLIServiceClient { private final ICLIService cliService; + private TTransport tTransport; - public CLIServiceClientWrapper(ICLIService icliService) { + public CLIServiceClientWrapper(ICLIService icliService, TTransport tTransport) { cliService = icliService; + this.tTransport = tTransport; } @Override @@ -201,6 +203,10 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio FetchType fetchType) throws HiveSQLException { return cliService.fetchResults(opHandle, orientation, maxRows, fetchType); } + + public void closeTransport() { + tTransport.close(); + } } protected RetryingThriftCLIServiceClient(HiveConf conf) { @@ -210,24 +216,23 @@ protected RetryingThriftCLIServiceClient(HiveConf conf) { TimeUnit.SECONDS); } - public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { + public static CLIServiceClientWrapper newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { RetryingThriftCLIServiceClient retryClient = new RetryingThriftCLIServiceClient(conf); - retryClient.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT)); + TTransport tTransport = retryClient + .connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT)); ICLIService cliService = (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClient.class.getClassLoader(), CLIServiceClient.class.getInterfaces(), retryClient); - return new CLIServiceClientWrapper(cliService); + return new CLIServiceClientWrapper(cliService, tTransport); } - protected void connectWithRetry(int retries) throws HiveSQLException { + protected TTransport connectWithRetry(int retries) throws HiveSQLException { + TTransportException exception = null; for (int i = 0 ; i < retries; i++) { try { - connect(conf); - break; + return connect(conf); } catch (TTransportException e) { - if (i + 1 == retries) { - throw new HiveSQLException("Unable to connect after " + retries + " retries", e); - } + exception = e; LOG.warn("Connection attempt " + i, e); } try { @@ -236,6 +241,7 @@ protected void connectWithRetry(int retries) throws HiveSQLException { LOG.warn("Interrupted", e); } } + throw new HiveSQLException("Unable to connect after " + retries + " retries", exception); } protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException, TTransportException { diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java index 3798053a4a2e2b26b97b5accf96d73b58c89fb6e..3bd82e614a50c0b5419a926ff00a95dafd4b0ebb 100644 --- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java +++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -51,14 +51,14 @@ protected RetryingThriftCLIServiceClientTest(HiveConf conf) { super(conf); } - public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { + public static CLIServiceClientWrapper newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { handlerInst = new RetryingThriftCLIServiceClientTest(conf); - handlerInst.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT)); - + TTransport tTransport + = handlerInst.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT)); ICLIService cliService = (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClientTest.class.getClassLoader(), CLIServiceClient.class.getInterfaces(), handlerInst); - return new CLIServiceClientWrapper(cliService); + return new CLIServiceClientWrapper(cliService, tTransport); } @Override @@ -108,8 +108,8 @@ public void testRetryBehaviour() throws Exception { // Reset port setting hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); // Create client - CLIServiceClient cliServiceClient = - RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + RetryingThriftCLIServiceClient.CLIServiceClientWrapper cliServiceClient + = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); System.out.println("## Created client"); // kill server @@ -127,7 +127,8 @@ public void testRetryBehaviour() throws Exception { assertTrue(exc.getCause() instanceof TException); assertEquals(1, RetryingThriftCLIServiceClientTest.handlerInst.callCount); assertEquals(3, RetryingThriftCLIServiceClientTest.handlerInst.connectCount); + } finally { + cliServiceClient.closeTransport(); } - } }