diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index f52ebee..adde03d 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -483,10 +483,16 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, HS2_RETURN_ERROR(return_val, status.GetErrorMsg(), SQLSTATE_GENERAL_ERROR); } } + VLOG_QUERY << "XXX pausing for 5 seconds"; + int64_t now = ms_since_epoch(); + while (ms_since_epoch() < now + 5000) ; + + VLOG_QUERY << "XXX done pausing for 5 seconds"; exec_state->UpdateQueryState(QueryState::RUNNING); // Start thread to wait for results to become available. exec_state->WaitAsync(); + VLOG_QUERY << "XXX WaitAsync done"; return_val.__isset.operationHandle = true; return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT); return_val.operationHandle.__set_hasResultSet(exec_state->returns_result_set()); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index fe337a7..707a5a6 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -755,12 +755,14 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, const Status* cau Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, bool ignore_if_absent) { + VLOG_QUERY << "XXX CloseSessionInternal start"; // Find the session_state and remove it from the map. shared_ptr session_state; { lock_guard l(session_state_map_lock_); SessionStateMap::iterator entry = session_state_map_.find(session_id); if (entry == session_state_map_.end()) { + VLOG_QUERY << "XXX CloseSessionInternal not found"; if (ignore_if_absent) { return Status::OK; } else { @@ -788,8 +790,10 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, // Unregister all open queries from this session. Status status("Session closed", true); BOOST_FOREACH(const TUniqueId& query_id, inflight_queries) { + VLOG_QUERY << "XXX CloseSessionInternal query=" << PrintId(query_id); UnregisterQuery(query_id, &status); } + VLOG_QUERY << "XXX CloseSessionInternal done"; return Status::OK; } diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py index ac0952a..649efff 100755 --- a/tests/hs2/test_hs2.py +++ b/tests/hs2/test_hs2.py @@ -16,11 +16,15 @@ # Client tests for Impala's HiveServer2 interface import pytest +import threading +import time from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session, operation_id_to_query_id from cli_service import TCLIService from ImpalaService import ImpalaHiveServer2Service from ExecStats.ttypes import TExecState +import pprint + class TestHS2(HS2TestSuite): def test_open_session(self): """Check that a session can be opened""" @@ -51,6 +55,34 @@ class TestHS2(HS2TestSuite): TestHS2.check_response(self.hs2_client.CloseSession(close_session_req), TCLIService.TStatusCode.ERROR_STATUS) + def test_close_session_concurrently(self): + """Test that an open session can be closed while query exec in flight""" + print '- starting' + open_session_req = TCLIService.TOpenSessionReq() + resp = self.hs2_client.OpenSession(open_session_req) + TestHS2.check_response(resp) + self.session_handle2 = resp.sessionHandle + + def close_session_concurrently(): + time.sleep(0.01) + close_session_req = TCLIService.TCloseSessionReq() + close_session_req.sessionHandle = self.session_handle2 + TestHS2.check_response(self.hs2_client.CloseSession(close_session_req)) + self.session_handle2 = None + + thread = threading.Thread(target=close_session_concurrently); + + execute_statement_req = TCLIService.TExecuteStatementReq() + execute_statement_req.sessionHandle = resp.sessionHandle + execute_statement_req.statement = "select * from tpch.lineitem x, tpch.lineitem y where x.l_orderkey=y.l_orderkey" + + thread.start(); + time.sleep(0.01) + try: + self.hs2_client.ExecuteStatement(execute_statement_req) + finally: + thread.join(); + @needs_session def test_execute_select(self): """Test that a simple select statement works"""