diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index ef39573..a27ee6b 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -135,6 +135,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4); + supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5); // open client session openSession(); @@ -146,14 +147,14 @@ private void openTransport() throws SQLException { transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); - TProtocol protocol = new TBinaryProtocol(transport); - client = new TCLIService.Client(protocol); - try { - transport.open(); - } catch (TTransportException e) { - throw new SQLException("Could not open connection to " - + jdbcURI + ": " + e.getMessage(), " 08S01", e); - } + TProtocol protocol = new TBinaryProtocol(transport); + client = new TCLIService.Client(protocol); + try { + transport.open(); + } catch (TTransportException e) { + throw new SQLException("Could not open connection to " + + jdbcURI + ": " + e.getMessage(), " 08S01", e); + } } private TTransport createHttpTransport() throws SQLException { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index fce19bf..aa6654d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -231,6 +231,7 @@ public boolean execute(String sql) throws SQLException { case UKNOWN_STATE: throw new SQLException("Unknown query", "HY000"); case INITIALIZED_STATE: + case PENDING_STATE: case RUNNING_STATE: break; } diff --git a/service/if/TCLIService.thrift b/service/if/TCLIService.thrift index 1f49445..415f2e0 100644 --- a/service/if/TCLIService.thrift +++ b/service/if/TCLIService.thrift @@ -48,6 +48,9 @@ enum TProtocolVersion { // V4 add support for decimial datatype HIVE_CLI_SERVICE_PROTOCOL_V4 + + // V5 adds error details when GetOperationStatus returns in error state + HIVE_CLI_SERVICE_PROTOCOL_V5 } enum TTypeId { @@ -386,12 +389,11 @@ enum TOperationState { // The operation is in an unrecognized state UKNOWN_STATE, - + // The operation is in an pending state PENDING_STATE, } - // A string identifier. This is interpreted literally. typedef string TIdentifier @@ -482,11 +484,11 @@ struct TOperationHandle { // OpenSession() // // Open a session (connection) on the server against -// which operations may be executed. +// which operations may be executed. struct TOpenSessionReq { // The version of the HiveServer2 protocol that the client is using. - 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3 - + 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5 + // Username and password for authentication. // Depending on the authentication scheme being used, // this information may instead be provided by a lower @@ -504,7 +506,7 @@ struct TOpenSessionResp { 1: required TStatus status // The protocol version that the server is using. - 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4 + 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5 // Session Handle 3: optional TSessionHandle sessionHandle @@ -901,6 +903,16 @@ struct TGetOperationStatusReq { struct TGetOperationStatusResp { 1: required TStatus status 2: optional TOperationState operationState + + // If operationState is ERROR_STATE, then the following fields may be set + // sqlState as defined in the ISO/IEF CLI specification + 3: optional string sqlState + + // Internal error code + 4: optional i32 errorCode + + // Error message + 5: optional string errorMessage } diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 8c85386..c56d942 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -123,9 +123,9 @@ public SessionHandle openSession(String username, String password, Map configuration, - String delegationToken) throws HiveSQLException { + String delegationToken) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(username, password, configuration, - true, delegationToken); + true, delegationToken); LOG.info(sessionHandle + ": openSession()"); return sessionHandle; } @@ -158,7 +158,7 @@ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) - throws HiveSQLException { + throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .executeStatement(statement, confOverlay); LOG.info(sessionHandle + ": executeStatement()"); @@ -207,7 +207,7 @@ public OperationHandle getCatalogs(SessionHandle sessionHandle) @Override public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) - throws HiveSQLException { + throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .getSchemas(catalogName, schemaName); LOG.info(sessionHandle + ": getSchemas()"); @@ -220,7 +220,7 @@ public OperationHandle getSchemas(SessionHandle sessionHandle, @Override public OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List tableTypes) - throws HiveSQLException { + throws HiveSQLException { OperationHandle opHandle = sessionManager .getSession(sessionHandle).getTables(catalogName, schemaName, tableName, tableTypes); LOG.info(sessionHandle + ": getTables()"); @@ -244,7 +244,7 @@ public OperationHandle getTableTypes(SessionHandle sessionHandle) @Override public OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName) - throws HiveSQLException { + throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .getColumns(catalogName, schemaName, tableName, columnName); LOG.info(sessionHandle + ": getColumns()"); @@ -257,7 +257,7 @@ public OperationHandle getColumns(SessionHandle sessionHandle, @Override public OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName) - throws HiveSQLException { + throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .getFunctions(catalogName, schemaName, functionName); LOG.info(sessionHandle + ": getFunctions()"); @@ -268,11 +268,11 @@ public OperationHandle getFunctions(SessionHandle sessionHandle, * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) */ @Override - public OperationState getOperationStatus(OperationHandle opHandle) + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { - OperationState opState = sessionManager.getOperationManager().getOperationState(opHandle); + OperationStatus opStatus = sessionManager.getOperationManager().getOperationStatus(opHandle); LOG.info(opHandle + ": getOperationStatus()"); - return opState; + return opStatus; } /* (non-Javadoc) @@ -282,7 +282,7 @@ public OperationState getOperationStatus(OperationHandle opHandle) public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { sessionManager.getOperationManager().getOperation(opHandle). - getParentSession().cancelOperation(opHandle); + getParentSession().cancelOperation(opHandle); LOG.info(opHandle + ": cancelOperation()"); } @@ -293,7 +293,7 @@ public void cancelOperation(OperationHandle opHandle) public void closeOperation(OperationHandle opHandle) throws HiveSQLException { sessionManager.getOperationManager().getOperation(opHandle). - getParentSession().closeOperation(opHandle); + getParentSession().closeOperation(opHandle); LOG.info(opHandle + ": closeOperation"); } @@ -339,7 +339,7 @@ public synchronized String getDelegationTokenFromMetaStore(String owner) if (!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL) || !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { throw new UnsupportedOperationException( - "delegation token is can only be obtained for a secure remote metastore"); + "delegation token is can only be obtained for a secure remote metastore"); } try { diff --git a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java index 14ef54f..b9d1489 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -19,8 +19,6 @@ package org.apache.hive.service.cli; import java.util.Collections; -import java.util.List; -import java.util.Map; /** @@ -29,127 +27,12 @@ */ public abstract class CLIServiceClient implements ICLIService { - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#openSession(java.lang.String, java.lang.String, java.util.Map) - */ - @Override - public abstract SessionHandle openSession(String username, String password, - Map configuration) throws HiveSQLException; - - public SessionHandle openSession(String username, String password) throws HiveSQLException { return openSession(username, password, Collections.emptyMap()); } /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public abstract void closeSession(SessionHandle sessionHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List) - */ - @Override - public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType) - throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ - @Override - public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, - Map confOverlay) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ - @Override - public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, - Map confOverlay) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getCatalogs(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public abstract OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String) - */ - @Override - public abstract OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, - String schemaName) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List) - */ - @Override - public abstract OperationHandle getTables(SessionHandle sessionHandle, String catalogName, - String schemaName, String tableName, List tableTypes) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getTableTypes(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public abstract OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getColumns(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.lang.String) - */ - @Override - public abstract OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, - String schemaName, String tableName, String columnName) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getFunctions(org.apache.hive.service.cli.SessionHandle, java.lang.String) - */ - @Override - public abstract OperationHandle getFunctions(SessionHandle sessionHandle, - String catalogName, String schemaName, String functionName) - throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public abstract OperationState getOperationStatus(OperationHandle opHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public abstract void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#closeOperation(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public abstract void closeOperation(OperationHandle opHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public abstract TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ - @Override - public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException; - - /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java index 9dca874..f13a416 100644 --- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java @@ -27,165 +27,165 @@ * */ public class EmbeddedCLIServiceClient extends CLIServiceClient { - private final ICLIService cliService; - - public EmbeddedCLIServiceClient(ICLIService cliService) { - this.cliService = cliService; - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#openSession(java.lang.String, java.lang.String, java.util.Map) - */ - @Override - public SessionHandle openSession(String username, String password, - Map configuration) throws HiveSQLException { - return cliService.openSession(username, password, configuration); - } - - @Override - public SessionHandle openSessionWithImpersonation(String username, String password, - Map configuration, String delegationToken) throws HiveSQLException { - throw new HiveSQLException("Impersonated session is not supported in the embedded mode"); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#closeSession(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { - cliService.closeSession(sessionHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List) - */ - @Override - public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType) - throws HiveSQLException { - return cliService.getInfo(sessionHandle, getInfoType); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ - @Override - public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, - Map confOverlay) throws HiveSQLException { - return cliService.executeStatement(sessionHandle, statement, confOverlay); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ - @Override - public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, - Map confOverlay) throws HiveSQLException { - return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); - } - - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException { - return cliService.getTypeInfo(sessionHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getCatalogs(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException { - return cliService.getCatalogs(sessionHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String) - */ - @Override - public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, - String schemaName) throws HiveSQLException { - return cliService.getSchemas(sessionHandle, catalogName, schemaName); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List) - */ - @Override - public OperationHandle getTables(SessionHandle sessionHandle, String catalogName, - String schemaName, String tableName, List tableTypes) throws HiveSQLException { - return cliService.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getTableTypes(org.apache.hive.service.cli.SessionHandle) - */ - @Override - public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException { - return cliService.getTableTypes(sessionHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getColumns(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.lang.String) - */ - @Override - public OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, - String schemaName, String tableName, String columnName) throws HiveSQLException { - return cliService.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getFunctions(org.apache.hive.service.cli.SessionHandle, java.lang.String) - */ - @Override - public OperationHandle getFunctions(SessionHandle sessionHandle, - String catalogName, String schemaName, String functionName) - throws HiveSQLException { - return cliService.getFunctions(sessionHandle, catalogName, schemaName, functionName); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getOperationStatus(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public OperationState getOperationStatus(OperationHandle opHandle) throws HiveSQLException { - return cliService.getOperationStatus(opHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#cancelOperation(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - cliService.cancelOperation(opHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#closeOperation(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - cliService.closeOperation(opHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle) - */ - @Override - public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - return cliService.getResultSetMetadata(opHandle); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ - @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { - return cliService.fetchResults(opHandle, orientation, maxRows); - } + private final ICLIService cliService; + + public EmbeddedCLIServiceClient(ICLIService cliService) { + this.cliService = cliService; + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#openSession(java.lang.String, java.lang.String, java.util.Map) + */ + @Override + public SessionHandle openSession(String username, String password, + Map configuration) throws HiveSQLException { + return cliService.openSession(username, password, configuration); + } + + @Override + public SessionHandle openSessionWithImpersonation(String username, String password, + Map configuration, String delegationToken) throws HiveSQLException { + throw new HiveSQLException("Impersonated session is not supported in the embedded mode"); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#closeSession(org.apache.hive.service.cli.SessionHandle) + */ + @Override + public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { + cliService.closeSession(sessionHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List) + */ + @Override + public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType) + throws HiveSQLException { + return cliService.getInfo(sessionHandle, getInfoType); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { + return cliService.executeStatement(sessionHandle, statement, confOverlay); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { + return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); + } + + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle) + */ + @Override + public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException { + return cliService.getTypeInfo(sessionHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getCatalogs(org.apache.hive.service.cli.SessionHandle) + */ + @Override + public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException { + return cliService.getCatalogs(sessionHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String) + */ + @Override + public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, + String schemaName) throws HiveSQLException { + return cliService.getSchemas(sessionHandle, catalogName, schemaName); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List) + */ + @Override + public OperationHandle getTables(SessionHandle sessionHandle, String catalogName, + String schemaName, String tableName, List tableTypes) throws HiveSQLException { + return cliService.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getTableTypes(org.apache.hive.service.cli.SessionHandle) + */ + @Override + public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException { + return cliService.getTableTypes(sessionHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getColumns(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, + String schemaName, String tableName, String columnName) throws HiveSQLException { + return cliService.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getFunctions(org.apache.hive.service.cli.SessionHandle, java.lang.String) + */ + @Override + public OperationHandle getFunctions(SessionHandle sessionHandle, + String catalogName, String schemaName, String functionName) + throws HiveSQLException { + return cliService.getFunctions(sessionHandle, catalogName, schemaName, functionName); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getOperationStatus(org.apache.hive.service.cli.OperationHandle) + */ + @Override + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + return cliService.getOperationStatus(opHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#cancelOperation(org.apache.hive.service.cli.OperationHandle) + */ + @Override + public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + cliService.cancelOperation(opHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#closeOperation(org.apache.hive.service.cli.OperationHandle) + */ + @Override + public void closeOperation(OperationHandle opHandle) throws HiveSQLException { + cliService.closeOperation(opHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle) + */ + @Override + public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { + return cliService.getResultSetMetadata(opHandle); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) + */ + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) + throws HiveSQLException { + return cliService.fetchResults(opHandle, orientation, maxRows); + } } diff --git a/service/src/java/org/apache/hive/service/cli/HiveSQLException.java b/service/src/java/org/apache/hive/service/cli/HiveSQLException.java index 74e8b94..bfe35ae 100644 --- a/service/src/java/org/apache/hive/service/cli/HiveSQLException.java +++ b/service/src/java/org/apache/hive/service/cli/HiveSQLException.java @@ -29,96 +29,96 @@ */ public class HiveSQLException extends SQLException { - /** - * - */ - private static final long serialVersionUID = -6095254671958748094L; - - /** - * - */ - public HiveSQLException() { - super(); - } - - /** - * @param reason - */ - public HiveSQLException(String reason) { - super(reason); - } - - /** - * @param cause - */ - public HiveSQLException(Throwable cause) { - super(cause); - } - - /** - * @param reason - * @param sqlState - */ - public HiveSQLException(String reason, String sqlState) { - super(reason, sqlState); - } - - /** - * @param reason - * @param cause - */ - public HiveSQLException(String reason, Throwable cause) { - super(reason, cause); - } - - /** - * @param reason - * @param sqlState - * @param vendorCode - */ - public HiveSQLException(String reason, String sqlState, int vendorCode) { - super(reason, sqlState, vendorCode); - } - - /** - * @param reason - * @param sqlState - * @param cause - */ - public HiveSQLException(String reason, String sqlState, Throwable cause) { - super(reason, sqlState, cause); - } - - /** - * @param reason - * @param sqlState - * @param vendorCode - * @param cause - */ - public HiveSQLException(String reason, String sqlState, int vendorCode, Throwable cause) { - super(reason, sqlState, vendorCode, cause); - } - - public HiveSQLException(TStatus status) { - // TODO: set correct vendorCode field - super(status.getErrorMessage(), status.getSqlState(), 1); - } - - public TStatus toTStatus() { - // TODO: convert sqlState, etc. - TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS); - tStatus.setSqlState(getSQLState()); - tStatus.setErrorCode(getErrorCode()); - tStatus.setErrorMessage(getMessage()); - return tStatus; - } - - public static TStatus toTStatus(Exception e) { - if (e instanceof HiveSQLException) { - return ((HiveSQLException)e).toTStatus(); - } - TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS); - return tStatus; - } + /** + * + */ + private static final long serialVersionUID = -6095254671958748094L; + + /** + * + */ + public HiveSQLException() { + super(); + } + + /** + * @param reason + */ + public HiveSQLException(String reason) { + super(reason); + } + + /** + * @param cause + */ + public HiveSQLException(Throwable cause) { + super(cause); + } + + /** + * @param reason + * @param sqlState + */ + public HiveSQLException(String reason, String sqlState) { + super(reason, sqlState); + } + + /** + * @param reason + * @param cause + */ + public HiveSQLException(String reason, Throwable cause) { + super(reason, cause); + } + + /** + * @param reason + * @param sqlState + * @param vendorCode + */ + public HiveSQLException(String reason, String sqlState, int vendorCode) { + super(reason, sqlState, vendorCode); + } + + /** + * @param reason + * @param sqlState + * @param cause + */ + public HiveSQLException(String reason, String sqlState, Throwable cause) { + super(reason, sqlState, cause); + } + + /** + * @param reason + * @param sqlState + * @param vendorCode + * @param cause + */ + public HiveSQLException(String reason, String sqlState, int vendorCode, Throwable cause) { + super(reason, sqlState, vendorCode, cause); + } + + public HiveSQLException(TStatus status) { + // TODO: set correct vendorCode field + super(status.getErrorMessage(), status.getSqlState(), 1); + } + + public TStatus toTStatus() { + // TODO: convert sqlState, etc. + TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS); + tStatus.setSqlState(getSQLState()); + tStatus.setErrorCode(getErrorCode()); + tStatus.setErrorMessage(getMessage()); + return tStatus; + } + + public static TStatus toTStatus(Exception e) { + if (e instanceof HiveSQLException) { + return ((HiveSQLException)e).toTStatus(); + } + TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS); + return tStatus; + } } diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java index f647ce6..621d689 100644 --- a/service/src/java/org/apache/hive/service/cli/ICLIService.java +++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java @@ -27,11 +27,11 @@ public abstract SessionHandle openSession(String username, String password, Map configuration) - throws HiveSQLException; + throws HiveSQLException; public abstract SessionHandle openSessionWithImpersonation(String username, String password, Map configuration, String delegationToken) - throws HiveSQLException; + throws HiveSQLException; public abstract void closeSession(SessionHandle sessionHandle) throws HiveSQLException; @@ -41,11 +41,11 @@ public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType in public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) - throws HiveSQLException; + throws HiveSQLException; public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) - throws HiveSQLException; + throws HiveSQLException; public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; @@ -55,24 +55,24 @@ public abstract OperationHandle getCatalogs(SessionHandle sessionHandle) public abstract OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) - throws HiveSQLException; + throws HiveSQLException; public abstract OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List tableTypes) - throws HiveSQLException; + throws HiveSQLException; public abstract OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException; public abstract OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName) - throws HiveSQLException; + throws HiveSQLException; public abstract OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName) - throws HiveSQLException; + throws HiveSQLException; - public abstract OperationState getOperationStatus(OperationHandle opHandle) + public abstract OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException; public abstract void cancelOperation(OperationHandle opHandle) @@ -86,7 +86,7 @@ public abstract TableSchema getResultSetMetadata(OperationHandle opHandle) public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException; + throws HiveSQLException; public abstract RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index 1ec6bd1..b8446e1 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -25,80 +25,80 @@ * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE), + RUNNING(TOperationState.RUNNING_STATE), + FINISHED(TOperationState.FINISHED_STATE), + CANCELED(TOperationState.CANCELED_STATE), + CLOSED(TOperationState.CLOSED_STATE), + ERROR(TOperationState.ERROR_STATE), + UNKNOWN(TOperationState.UKNOWN_STATE), + PENDING(TOperationState.PENDING_STATE); - private final TOperationState tOperationState; + private final TOperationState tOperationState; - OperationState(TOperationState tOperationState) { - this.tOperationState = tOperationState; - } + OperationState(TOperationState tOperationState) { + this.tOperationState = tOperationState; + } - public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; - } + public static OperationState getOperationState(TOperationState tOperationState) { + // TODO: replace this with a Map? + for (OperationState opState : values()) { + if (tOperationState.equals(opState.tOperationState)) { + return opState; + } + } + return OperationState.UNKNOWN; + } - public static void validateTransition(OperationState oldState, OperationState newState) - throws HiveSQLException { - switch (oldState) { - case INITIALIZED: - switch (newState) { - case PENDING: - case RUNNING: - case CLOSED: - return; - } - break; - case PENDING: - switch (newState) { - case RUNNING: - case FINISHED: - case CANCELED: - case ERROR: - case CLOSED: - return; - } - break; - case RUNNING: - switch (newState) { - case FINISHED: - case CANCELED: - case ERROR: - case CLOSED: - return; - } - break; - case FINISHED: - case CANCELED: - case ERROR: - if (OperationState.CLOSED.equals(newState)) { - return; - } - default: - // fall-through - } - throw new HiveSQLException("Illegal Operation state transition"); - } + public static void validateTransition(OperationState oldState, OperationState newState) + throws HiveSQLException { + switch (oldState) { + case INITIALIZED: + switch (newState) { + case PENDING: + case RUNNING: + case CLOSED: + return; + } + break; + case PENDING: + switch (newState) { + case RUNNING: + case FINISHED: + case CANCELED: + case ERROR: + case CLOSED: + return; + } + break; + case RUNNING: + switch (newState) { + case FINISHED: + case CANCELED: + case ERROR: + case CLOSED: + return; + } + break; + case FINISHED: + case CANCELED: + case ERROR: + if (OperationState.CLOSED.equals(newState)) { + return; + } + default: + // fall-through + } + throw new HiveSQLException("Illegal Operation state transition"); + } - public void validateTransition(OperationState newState) - throws HiveSQLException { - validateTransition(this, newState); - } + public void validateTransition(OperationState newState) + throws HiveSQLException { + validateTransition(this, newState); + } - public TOperationState toTOperationState() { - return tOperationState; - } + public TOperationState toTOperationState() { + return tOperationState; + } } diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java new file mode 100644 index 0000000..b100e54 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli; + +/** + * OperationStatus. + * + */ +public class OperationStatus { + + private final OperationState state; + private final HiveSQLException operationException; + + public OperationStatus(OperationState state, HiveSQLException operationException) { + this.state = state; + this.operationException = operationException; + } + + public OperationState getState() { + return state; + } + + public HiveSQLException getOperationException() { + return operationException; + } + +} \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 6f4b8dc..ab93ec1 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -24,6 +24,7 @@ import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.OperationType; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; @@ -39,6 +40,7 @@ public static final Log LOG = LogFactory.getLog(Operation.class.getName()); public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; + protected volatile HiveSQLException operationException; protected Operation(HiveSession parentSession, OperationType opType) { super(); @@ -66,8 +68,8 @@ public OperationType getType() { return opHandle.getOperationType(); } - public OperationState getState() { - return state; + public OperationStatus getStatus() { + return new OperationStatus(state, operationException); } public boolean hasResultSet() { @@ -85,6 +87,10 @@ protected final OperationState setState(OperationState newState) throws HiveSQLE return this.state; } + protected void setOperationException(HiveSQLException operationException) { + this.operationException = operationException; + } + protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); @@ -92,19 +98,19 @@ protected final void assertState(OperationState state) throws HiveSQLException { } public boolean isRunning() { - return OperationState.RUNNING.equals(getState()); + return OperationState.RUNNING.equals(state); } public boolean isFinished() { - return OperationState.FINISHED.equals(getState()); + return OperationState.FINISHED.equals(state); } public boolean isCanceled() { - return OperationState.CANCELED.equals(getState()); + return OperationState.CANCELED.equals(state); } public boolean isFailed() { - return OperationState.ERROR.equals(getState()); + return OperationState.ERROR.equals(state); } public abstract void run() throws HiveSQLException; diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index bcdb67f..345617c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -27,7 +27,7 @@ import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; @@ -67,7 +67,7 @@ public synchronized void stop() { public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { + throws HiveSQLException { ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); addOperation(executeStatementOperation); @@ -140,8 +140,8 @@ private synchronized Operation removeOperation(OperationHandle opHandle) { return handleToOperation.remove(opHandle); } - public OperationState getOperationState(OperationHandle opHandle) throws HiveSQLException { - return getOperation(opHandle).getState(); + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + return getOperation(opHandle).getStatus(); } public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { @@ -167,7 +167,7 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLExc public RowSet getOperationNextRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4ee1b74..8d09427 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -51,6 +51,7 @@ import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; @@ -67,7 +68,7 @@ private Schema mResultSchema = null; private SerDe serde = null; private final boolean runAsync; - private Future backgroundHandle; + private volatile Future backgroundHandle; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { @@ -76,20 +77,11 @@ public SQLOperation(HiveSession parentSession, String statement, Map handleToSession = new HashMap(); - private OperationManager operationManager = new OperationManager(); - private static final Object sessionMapLock = new Object(); - private ExecutorService backgroundOperationPool; - - public SessionManager() { - super("SessionManager"); - } - - @Override - public synchronized void init(HiveConf hiveConf) { - this.hiveConf = hiveConf; - operationManager = new OperationManager(); - int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); - backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); - addService(operationManager); - super.init(hiveConf); - } - - @Override - public synchronized void start() { - super.start(); - // TODO - } - - @Override - public synchronized void stop() { - // TODO - super.stop(); - if (backgroundOperationPool != null) { - backgroundOperationPool.shutdown(); - long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); - try { - backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); - } catch (InterruptedException exc) { - LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + - " seconds has been exceeded. RUNNING background operations will be shut down", exc); - } - } - } - - - public SessionHandle openSession(String username, String password, Map sessionConf) - throws HiveSQLException { - return openSession(username, password, sessionConf, false, null); - } - - public SessionHandle openSession(String username, String password, Map sessionConf, - boolean withImpersonation, String delegationToken) throws HiveSQLException { - if (username == null) { - username = threadLocalUserName.get(); - } - HiveSession session; - if (withImpersonation) { - HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, - sessionConf, delegationToken); - session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); - hiveSessionUgi.setProxySession(session); - } else { - session = new HiveSessionImpl(username, password, sessionConf); - } - session.setSessionManager(this); - session.setOperationManager(operationManager); - synchronized(sessionMapLock) { - handleToSession.put(session.getSessionHandle(), session); - } - try { - executeSessionHooks(session); - } catch (Exception e) { - throw new HiveSQLException("Failed to execute session hooks", e); - } - return session.getSessionHandle(); - } - - public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { - HiveSession session; - synchronized(sessionMapLock) { - session = handleToSession.remove(sessionHandle); - } - if (session == null) { - throw new HiveSQLException("Session does not exist!"); - } - session.close(); - } - - - public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { - HiveSession session; - synchronized(sessionMapLock) { - session = handleToSession.get(sessionHandle); - } - if (session == null) { - throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle); - } - return session; - } - - public OperationManager getOperationManager() { - return operationManager; - } - - private static ThreadLocal threadLocalIpAddress = new ThreadLocal() { - @Override - protected synchronized String initialValue() { - return null; - } - }; - - public static void setIpAddress(String ipAddress) { - threadLocalIpAddress.set(ipAddress); - } - - private void clearIpAddress() { - threadLocalIpAddress.remove(); - } - - private static ThreadLocal threadLocalUserName = new ThreadLocal(){ - @Override - protected synchronized String initialValue() { - return null; - } - }; - - public static void setUserName(String userName) { - threadLocalUserName.set(userName); - } - - private void clearUserName() { - threadLocalUserName.remove(); - } - - // execute session hooks - private void executeSessionHooks(HiveSession session) throws Exception { - List sessionHooks = HookUtils.getHooks(hiveConf, - HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK, HiveSessionHook.class); - for (HiveSessionHook sessionHook : sessionHooks) { - sessionHook.run(new HiveSessionHookContextImpl(session)); - } - } - - public Future submitBackgroundOperation(Runnable r) { - return backgroundOperationPool.submit(r); - } + private static final Log LOG = LogFactory.getLog(CompositeService.class); + private HiveConf hiveConf; + private final Map handleToSession = new HashMap(); + private OperationManager operationManager = new OperationManager(); + private static final Object sessionMapLock = new Object(); + private ExecutorService backgroundOperationPool; + + public SessionManager() { + super("SessionManager"); + } + + @Override + public synchronized void init(HiveConf hiveConf) { + this.hiveConf = hiveConf; + operationManager = new OperationManager(); + int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); + backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); + addService(operationManager); + super.init(hiveConf); + } + + @Override + public synchronized void start() { + super.start(); + // TODO + } + + @Override + public synchronized void stop() { + // TODO + super.stop(); + if (backgroundOperationPool != null) { + backgroundOperationPool.shutdown(); + long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + try { + backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); + } catch (InterruptedException exc) { + LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + + " seconds has been exceeded. RUNNING background operations will be shut down", exc); + } + } + } + + + public SessionHandle openSession(String username, String password, Map sessionConf) + throws HiveSQLException { + return openSession(username, password, sessionConf, false, null); + } + + public SessionHandle openSession(String username, String password, Map sessionConf, + boolean withImpersonation, String delegationToken) throws HiveSQLException { + if (username == null) { + username = threadLocalUserName.get(); + } + HiveSession session; + if (withImpersonation) { + HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, + sessionConf, delegationToken); + session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); + hiveSessionUgi.setProxySession(session); + } else { + session = new HiveSessionImpl(username, password, sessionConf); + } + session.setSessionManager(this); + session.setOperationManager(operationManager); + synchronized(sessionMapLock) { + handleToSession.put(session.getSessionHandle(), session); + } + try { + executeSessionHooks(session); + } catch (Exception e) { + throw new HiveSQLException("Failed to execute session hooks", e); + } + return session.getSessionHandle(); + } + + public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { + HiveSession session; + synchronized(sessionMapLock) { + session = handleToSession.remove(sessionHandle); + } + if (session == null) { + throw new HiveSQLException("Session does not exist!"); + } + session.close(); + } + + + public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { + HiveSession session; + synchronized(sessionMapLock) { + session = handleToSession.get(sessionHandle); + } + if (session == null) { + throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle); + } + return session; + } + + public OperationManager getOperationManager() { + return operationManager; + } + + private static ThreadLocal threadLocalIpAddress = new ThreadLocal() { + @Override + protected synchronized String initialValue() { + return null; + } + }; + + public static void setIpAddress(String ipAddress) { + threadLocalIpAddress.set(ipAddress); + } + + private void clearIpAddress() { + threadLocalIpAddress.remove(); + } + + private static ThreadLocal threadLocalUserName = new ThreadLocal(){ + @Override + protected synchronized String initialValue() { + return null; + } + }; + + public static void setUserName(String userName) { + threadLocalUserName.set(userName); + } + + private void clearUserName() { + threadLocalUserName.remove(); + } + + // execute session hooks + private void executeSessionHooks(HiveSession session) throws Exception { + List sessionHooks = HookUtils.getHooks(hiveConf, + HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK, HiveSessionHook.class); + for (HiveSessionHook sessionHook : sessionHooks) { + sessionHook.run(new HiveSessionHookContextImpl(session)); + } + } + + public Future submitBackgroundOperation(Runnable r) { + return backgroundOperationPool.submit(r); + } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 9df110e..2d32ffe 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -37,7 +37,7 @@ import org.apache.hive.service.cli.GetInfoValue; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; @@ -114,6 +114,7 @@ public synchronized void stop() { @Override public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { + LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { SessionHandle sessionHandle = getSessionHandle(req); @@ -210,8 +211,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { - LOG.warn("Error executing statement: ", e); - resp.setStatus(HiveSQLException.toTStatus(e)); + LOG.warn("Error executing statement: ", e); + resp.setStatus(HiveSQLException.toTStatus(e)); } return resp; } @@ -328,8 +329,15 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { - OperationState operationState = cliService.getOperationStatus(new OperationHandle(req.getOperationHandle())); - resp.setOperationState(operationState.toTOperationState()); + OperationStatus operationStatus = cliService.getOperationStatus( + new OperationHandle(req.getOperationHandle())); + resp.setOperationState(operationStatus.getState().toTOperationState()); + HiveSQLException opException = operationStatus.getOperationException(); + if (opException != null) { + resp.setSqlState(opException.getSQLState()); + resp.setErrorCode(opException.getErrorCode()); + resp.setErrorMessage(opException.getMessage()); + } resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting operation status: ", e); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 9bb2a0f..edad8ea 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -28,6 +28,7 @@ import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; @@ -295,12 +296,18 @@ public OperationHandle getFunctions(SessionHandle sessionHandle, * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) */ @Override - public OperationState getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { try { TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle()); TGetOperationStatusResp resp = cliService.GetOperationStatus(req); + // Checks the status of the RPC call, throws an exception in case of error checkStatus(resp.getStatus()); - return OperationState.getOperationState(resp.getOperationState()); + OperationState opState = OperationState.getOperationState(resp.getOperationState()); + HiveSQLException opException = null; + if (opState == OperationState.ERROR) { + opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode()); + } + return new OperationStatus(opState, opException); } catch (HiveSQLException e) { throw e; } catch (Exception e) { diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index cd9d99a..7e6810f 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Collections; @@ -27,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,8 +57,8 @@ public void tearDown() throws Exception { @Test public void openSessionTest() throws Exception { - SessionHandle sessionHandle = client - .openSession("tom", "password", Collections.emptyMap()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); client.closeSession(sessionHandle); @@ -67,8 +69,9 @@ public void openSessionTest() throws Exception { @Test public void getFunctionsTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); + SessionHandle sessionHandle = client.openSession("tom", "password"); assertNotNull(sessionHandle); + OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); TableSchema schema = client.getResultSetMetadata(opHandle); @@ -96,13 +99,15 @@ public void getFunctionsTest() throws Exception { assertEquals("SPECIFIC_NAME", columnDesc.getName()); assertEquals(Type.STRING_TYPE, columnDesc.getType()); + // Cleanup client.closeOperation(opHandle); client.closeSession(sessionHandle); } @Test public void getInfoTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME); @@ -120,30 +125,39 @@ public void getInfoTest() throws Exception { @Test public void testExecuteStatement() throws Exception { HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", new HashMap()); assertNotNull(sessionHandle); - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); + OperationHandle opHandle; + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); - // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Create a test table queryString = "CREATE TABLE TEST_EXEC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Blocking execute queryString = "SELECT ID FROM TEST_EXEC"; - OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); - + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); // Expect query to be completed now assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); + client.closeOperation(opHandle); + + // Cleanup + queryString = "DROP TABLE IF EXISTS TEST_EXEC"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); + client.closeSession(sessionHandle); } @Test @@ -151,29 +165,34 @@ public void testExecuteStatementAsync() throws Exception { HashMap confOverlay = new HashMap(); SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; assertNotNull(sessionHandle); + + // Timeout for the poll in case of asynchronous execute + long pollTimeout = System.currentTimeMillis() + 10000; OperationState state = null; - OperationHandle ophandle; + OperationHandle opHandle; + OperationStatus opStatus = null; - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Create a test table queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Test async execution response when query is malformed - String wrongQueryString = "SELECT NAME FROM TEST_EXEC"; - ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - + // This query will throw an error with errorCode: 10004 + // Refer org.apache.hadoop.hive.ql.ErrorMsg for details + String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC"; + opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); int count = 0; while (true) { // Break if polling times out @@ -181,24 +200,31 @@ public void testExecuteStatementAsync() throws Exception { System.out.println("Polling timed out"); break; } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) + opStatus = client.getOperationStatus(opHandle); + state = opStatus.getState(); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (OperationState.CANCELED == state || state == OperationState.CLOSED + if (state == OperationState.CANCELED || state == OperationState.CLOSED || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } Thread.sleep(1000); } - assertEquals("Query should return an error state", - OperationState.ERROR, client.getOperationStatus(ophandle)); + assertEquals("Operation should be in error state", OperationState.ERROR, state); + // sqlState, errorCode and errorMsg should be set to appropriate values + // Refer org.apache.hadoop.hive.ql.ErrorMsg for details + assertEquals(opStatus.getOperationException().getSQLState(), "42000"); + assertEquals(opStatus.getOperationException().getErrorCode(), 10004); + // The expected error message should be a substring of the returned error message + String errorMsg = opStatus.getOperationException().getMessage().toLowerCase(); + String expectedErrorMsg = ErrorMsg.getErrorMsg(10004).getMsg().toLowerCase(); + assertTrue("Incorrect error message", errorMsg.contains(expectedErrorMsg)); + client.closeOperation(opHandle); // Test async execution when query is well formed queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); - + opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); count = 0; while (true) { // Break if polling times out @@ -206,26 +232,33 @@ public void testExecuteStatementAsync() throws Exception { System.out.println("Polling timed out"); break; } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) + opStatus = client.getOperationStatus(opHandle); + state = opStatus.getState(); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (OperationState.CANCELED == state || state == OperationState.CLOSED + if (state == OperationState.CANCELED || state == OperationState.CLOSED || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } Thread.sleep(1000); } - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); + assertEquals("Query should be finished", OperationState.FINISHED, state); + client.closeOperation(opHandle); // Cancellation test - ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + ophandle); - client.cancelOperation(ophandle); - state = client.getOperationStatus(ophandle); - System.out.println(ophandle + " after cancelling, state= " + state); + opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + System.out.println("cancelling " + opHandle); + client.cancelOperation(opHandle); + state = client.getOperationStatus(opHandle).getState(); + System.out.println(opHandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); + client.closeSession(sessionHandle); } /** @@ -261,7 +294,7 @@ public void testConfOverlay() throws Exception { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); // select from the new table should pass @@ -270,10 +303,10 @@ public void testConfOverlay() throws Exception { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); - // the settings in confoverly should not be part of session config + // the settings in conf overlay should not be part of session config // another query referring that property with the conf overlay should fail selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}"; try { @@ -287,8 +320,6 @@ public void testConfOverlay() throws Exception { dropTable = "DROP TABLE IF EXISTS " + tabName; opHandle = client.executeStatement(sessionHandle, dropTable, null); client.closeOperation(opHandle); - - client.closeSession(sessionHandle); } } diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java index ff7166d..4bc2f96 100644 --- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java @@ -31,12 +31,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hive.service.Service; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationState; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; @@ -67,8 +67,6 @@ protected static String anonymousUser = "anonymous"; protected static String anonymousPasswd = "anonymous"; - - /** * @throws java.lang.Exception */ @@ -162,6 +160,10 @@ public void testGetFunctions() throws Exception { client.CloseSession(closeReq); } + /** + * Test synchronous query execution + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { // Create a new request object @@ -172,19 +174,19 @@ public void testExecuteStatement() throws Exception { // Change lock manager to embedded mode String queryString = "SET hive.lock.manager=" + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC_THRIFT"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); // Create a test table queryString = "CREATE TABLE TEST_EXEC_THRIFT(ID STRING)"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); - // Execute another query to test + // Execute another query queryString = "SELECT ID FROM TEST_EXEC_THRIFT"; - TExecuteStatementResp execResp = executeQuerySync(queryString, sessHandle); + TExecuteStatementResp execResp = executeQuery(queryString, sessHandle, false); TOperationHandle operationHandle = execResp.getOperationHandle(); assertNotNull(operationHandle); @@ -192,25 +194,135 @@ public void testExecuteStatement() throws Exception { opStatusReq.setOperationHandle(operationHandle); assertNotNull(opStatusReq); TGetOperationStatusResp opStatusResp = client.GetOperationStatus(opStatusReq); - + TOperationState state = opStatusResp.getOperationState(); // Expect query to be completed now - assertEquals("Query should be finished", - OperationState.FINISHED, OperationState.getOperationState(opStatusResp.getOperationState())); + assertEquals("Query should be finished", TOperationState.FINISHED_STATE, state); + // Cleanup queryString = "DROP TABLE TEST_EXEC_THRIFT"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); + + // Close the session; ignore exception if any + TCloseSessionReq closeReq = new TCloseSessionReq(sessHandle); + client.CloseSession(closeReq); + } + + /** + * Test asynchronous query execution and error message reporting to the client + * @throws Exception + */ + @Test + public void testExecuteStatementAsync() throws Exception { + // Create a new request object + TOpenSessionReq openReq = new TOpenSessionReq(); + TSessionHandle sessHandle = client.OpenSession(openReq).getSessionHandle(); + assertNotNull(sessHandle); + + // Change lock manager to embedded mode + String queryString = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + executeQuery(queryString, sessHandle, false); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC_THRIFT"; + executeQuery(queryString, sessHandle, false); + + // Create a test table + queryString = "CREATE TABLE TEST_EXEC_ASYNC_THRIFT(ID STRING)"; + executeQuery(queryString, sessHandle, false); + + // Execute another query + queryString = "SELECT ID FROM TEST_EXEC_ASYNC_THRIFT"; + System.out.println("Will attempt to execute: " + queryString); + TExecuteStatementResp execResp = executeQuery(queryString, sessHandle, true); + TOperationHandle operationHandle = execResp.getOperationHandle(); + assertNotNull(operationHandle); + + // Poll on the operation status till the query is completed + boolean isQueryRunning = true; + TGetOperationStatusReq opStatusReq; + TGetOperationStatusResp opStatusResp = null; + TOperationState state = null; + long pollTimeout = System.currentTimeMillis() + 100000; + + while(isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + opStatusReq = new TGetOperationStatusReq(); + opStatusReq.setOperationHandle(operationHandle); + assertNotNull(opStatusReq); + opStatusResp = client.GetOperationStatus(opStatusReq); + state = opStatusResp.getOperationState(); + System.out.println("Current state: " + state); + + if (state == TOperationState.CANCELED_STATE || state == TOperationState.CLOSED_STATE + || state == TOperationState.FINISHED_STATE || state == TOperationState.ERROR_STATE) { + isQueryRunning = false; + } + Thread.sleep(1000); + } + + // Expect query to be successfully completed now + assertEquals("Query should be finished", + TOperationState.FINISHED_STATE, state); + + // Execute a malformed query + queryString = "SELECT NON_EXISTING_COLUMN FROM TEST_EXEC_ASYNC_THRIFT"; + System.out.println("Will attempt to execute: " + queryString); + execResp = executeQuery(queryString, sessHandle, true); + operationHandle = execResp.getOperationHandle(); + assertNotNull(operationHandle); + isQueryRunning = true; + while(isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + opStatusReq = new TGetOperationStatusReq(); + opStatusReq.setOperationHandle(operationHandle); + assertNotNull(opStatusReq); + opStatusResp = client.GetOperationStatus(opStatusReq); + state = opStatusResp.getOperationState(); + System.out.println("Current state: " + state); + + if (state == TOperationState.CANCELED_STATE || state == TOperationState.CLOSED_STATE + || state == TOperationState.FINISHED_STATE || state == TOperationState.ERROR_STATE) { + isQueryRunning = false; + } + Thread.sleep(1000); + } + + // Expect query to return an error state + assertEquals("Operation should be in error state", TOperationState.ERROR_STATE, state); + + // sqlState, errorCode and errorMsg should be set to appropriate values + // Refer org.apache.hadoop.hive.ql.ErrorMsg for details + assertEquals(opStatusResp.getSqlState(), "42000"); + assertEquals(opStatusResp.getErrorCode(), 10004); + // The expected error message should be a substring of the returned error message + String errorMsg = opStatusResp.getErrorMessage().toLowerCase(); + String expectedErrorMsg = ErrorMsg.getErrorMsg(10004).getMsg().toLowerCase(); + assertTrue("Incorrect error message", errorMsg.contains(expectedErrorMsg)); + + // Cleanup + queryString = "DROP TABLE TEST_EXEC_ASYNC_THRIFT"; + executeQuery(queryString, sessHandle, false); // Close the session; ignore exception if any TCloseSessionReq closeReq = new TCloseSessionReq(sessHandle); client.CloseSession(closeReq); } - private TExecuteStatementResp executeQuerySync(String queryString, TSessionHandle sessHandle) + private TExecuteStatementResp executeQuery(String queryString, TSessionHandle sessHandle, boolean runAsync) throws Exception { TExecuteStatementReq execReq = new TExecuteStatementReq(); execReq.setSessionHandle(sessHandle); execReq.setStatement(queryString); - execReq.setRunAsync(false); + execReq.setRunAsync(runAsync); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); assertNotNull(execResp); assertFalse(execResp.getStatus().getStatusCode() == TStatusCode.ERROR_STATUS);