diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 5a0f1c8..70cf1a4 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Formatter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.shims.Utils; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.ServiceUtils; @@ -86,6 +88,43 @@ protected TServerEventHandler serverEventHandler; protected ThreadLocal currentServerContext; + private enum ThriftCliFunctions { + GetDelegationToken, + CancelDelegationToken, + RenewDelegationToken, + OpenSession, + CloseSession, + GetInfo, + ExecuteStatement, + GetTypeInfo, + GetCatalogs, + GetSchemas, + GetTables, + GetTableTypes, + GetColumns, + GetFunctions, + GetOperationStatus, + CancelOperation, + CloseOperation, + GetResultSetMetadata, + FetchResults + } + + public static final String AUDIT_FORMAT = + "ugi=%s\t" + // ugi + "ip=%s\t" + // remote IP + "cmd=%s"; // command + public static final Log auditLog = + LogFactory.getLog(ThriftCLIService.class.getName() + ".audit"); + + private static final ThreadLocal auditFormatter = + new ThreadLocal() { + @Override + protected Formatter initialValue() { + return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); + } + }; + static class ThriftCLIServerContext implements ServerContext { private SessionHandle sessionHandle = null; @@ -219,6 +258,33 @@ public InetAddress getServerIPAddress() { return serverIPAddress; } + private void logAuditEvent(String cmd) throws HiveSQLException { + if (cmd == null) { + return; + } + + final Formatter fmt = auditFormatter.get(); + ((StringBuilder) fmt.out()).setLength(0); + + String address = getIpAddress(); + if (address == null) { + address = "unknown-ip-addr"; + } + auditLog.info(fmt.format(AUDIT_FORMAT, getUserName(), address, cmd).toString()); + } + + public void startFunction(ThriftCliFunctions function, String extraLogInfo) + throws HiveSQLException { + logAuditEvent(function + extraLogInfo); + } + + public void startFunction(ThriftCliFunctions function) throws HiveSQLException { + startFunction(function, ""); + } + + public void endFunction(ThriftCliFunctions function) { + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { @@ -228,6 +294,7 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.GetDelegationToken); String token = cliService.getDelegationToken( new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getOwner(), req.getRenewer()); @@ -238,6 +305,8 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) TStatus tokenErrorStatus = HiveSQLException.toTStatus(e); tokenErrorStatus.setSqlState("42000"); resp.setStatus(tokenErrorStatus); + } finally { + endFunction(ThriftCliFunctions.GetDelegationToken); } } return resp; @@ -252,12 +321,15 @@ public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenRe resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.CancelDelegationToken); cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error canceling delegation token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelDelegationToken); } } return resp; @@ -271,12 +343,15 @@ public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq r resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.RenewDelegationToken); cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error obtaining renewing token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.RenewDelegationToken); } } return resp; @@ -294,6 +369,7 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + startFunction(ThriftCliFunctions.OpenSession); SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); // TODO: set real configuration map @@ -307,6 +383,8 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.OpenSession); } return resp; } @@ -340,10 +418,30 @@ private String getIpAddress() { * that the connecting user is trying to proxy for. * This includes a check whether the connecting user is allowed to proxy for the end user. * @param req - * @return + * @return userName * @throws HiveSQLException */ private String getUserName(TOpenSessionReq req) throws HiveSQLException { + String userName = getUserName(); + if (userName == null) { + userName = req.getUsername(); + } + + try { + if (userName == null) { + userName = Utils.getUGI().getShortUserName(); + } + } catch (Exception e) { + LOG.warn("Unable to determine user name", e); + userName = "unknown-user"; + } + + String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); + LOG.debug("Client's username: " + effectiveClientUser); + return effectiveClientUser; + } + + private String getUserName() throws HiveSQLException { String userName = null; // Kerberos if (isKerberosAuthMode()) { @@ -359,14 +457,8 @@ private String getUserName(TOpenSessionReq req) throws HiveSQLException { ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) { userName = SessionManager.getUserName(); } - if (userName == null) { - userName = req.getUsername(); - } - userName = getShortName(userName); - String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); - LOG.debug("Client's username: " + effectiveClientUser); - return effectiveClientUser; + return userName; } private String getShortName(String userName) { @@ -444,6 +536,7 @@ private TProtocolVersion getMinVersion(TProtocolVersion... versions) { public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { TCloseSessionResp resp = new TCloseSessionResp(); try { + startFunction(ThriftCliFunctions.CloseSession); SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); cliService.closeSession(sessionHandle); resp.setStatus(OK_STATUS); @@ -455,6 +548,8 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseSession); } return resp; } @@ -463,6 +558,7 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { TGetInfoResp resp = new TGetInfoResp(); try { + startFunction(ThriftCliFunctions.GetInfo); GetInfoValue getInfoValue = cliService.getInfo(new SessionHandle(req.getSessionHandle()), GetInfoType.getGetInfoType(req.getInfoType())); @@ -471,6 +567,8 @@ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetInfo); } return resp; } @@ -481,6 +579,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); + startFunction(ThriftCliFunctions.ExecuteStatement, + "\tstmt={" + statement.replaceAll("[\\r\\n\\t]", "") + "}"); Map confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); OperationHandle operationHandle = runAsync ? @@ -491,6 +591,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T } catch (Exception e) { LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.ExecuteStatement); } return resp; } @@ -499,12 +601,15 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { TGetTypeInfoResp resp = new TGetTypeInfoResp(); try { + startFunction(ThriftCliFunctions.GetTypeInfo); OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting type info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTypeInfo); } return resp; } @@ -513,12 +618,15 @@ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { TGetCatalogsResp resp = new TGetCatalogsResp(); try { + startFunction(ThriftCliFunctions.GetCatalogs); OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting catalogs: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetCatalogs); } return resp; } @@ -527,6 +635,8 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { TGetSchemasResp resp = new TGetSchemasResp(); try { + startFunction(ThriftCliFunctions.GetSchemas, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName()); OperationHandle opHandle = cliService.getSchemas( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName()); resp.setOperationHandle(opHandle.toTOperationHandle()); @@ -534,6 +644,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting schemas: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetSchemas); } return resp; } @@ -542,6 +654,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { public TGetTablesResp GetTables(TGetTablesReq req) throws TException { TGetTablesResp resp = new TGetTablesResp(); try { + startFunction(ThriftCliFunctions.GetTables, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName()); OperationHandle opHandle = cliService .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getTableName(), req.getTableTypes()); @@ -550,6 +664,8 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting tables: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTables); } return resp; } @@ -558,12 +674,15 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException { TGetTableTypesResp resp = new TGetTableTypesResp(); try { + startFunction(ThriftCliFunctions.GetTableTypes); OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting table types: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTableTypes); } return resp; } @@ -572,6 +691,9 @@ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { TGetColumnsResp resp = new TGetColumnsResp(); try { + startFunction(ThriftCliFunctions.GetColumns, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName() + + "\tcolumn=" + req.getColumnName()); OperationHandle opHandle = cliService.getColumns( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), @@ -583,6 +705,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting columns: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetColumns); } return resp; } @@ -591,6 +715,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { TGetFunctionsResp resp = new TGetFunctionsResp(); try { + startFunction(ThriftCliFunctions.GetFunctions, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\tfunction=" + req.getFunctionName()); OperationHandle opHandle = cliService.getFunctions( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getFunctionName()); @@ -599,6 +725,8 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting functions: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetFunctions); } return resp; } @@ -607,6 +735,7 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { + startFunction(ThriftCliFunctions.GetOperationStatus); OperationStatus operationStatus = cliService.getOperationStatus( new OperationHandle(req.getOperationHandle())); resp.setOperationState(operationStatus.getState().toTOperationState()); @@ -620,6 +749,8 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } catch (Exception e) { LOG.warn("Error getting operation status: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetOperationStatus); } return resp; } @@ -628,11 +759,14 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException { TCancelOperationResp resp = new TCancelOperationResp(); try { + startFunction(ThriftCliFunctions.CancelOperation); cliService.cancelOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error cancelling operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelOperation); } return resp; } @@ -641,11 +775,14 @@ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TExc public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException { TCloseOperationResp resp = new TCloseOperationResp(); try { + startFunction(ThriftCliFunctions.CloseOperation); cliService.closeOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseOperation); } return resp; } @@ -655,12 +792,15 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r throws TException { TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); try { + startFunction(ThriftCliFunctions.GetResultSetMetadata); TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); resp.setSchema(schema.toTTableSchema()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting result set metadata: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetResultSetMetadata); } return resp; } @@ -669,6 +809,7 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + startFunction(ThriftCliFunctions.FetchResults); RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), @@ -680,6 +821,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { } catch (Exception e) { LOG.warn("Error fetching results: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.FetchResults); } return resp; }