diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 3345b5f..ef62333 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Formatter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -70,6 +71,43 @@ protected int maxWorkerThreads; protected long workerKeepAliveTime; + private enum ThriftCliFunctions { + GetDelegationToken, + CancelDelegationToken, + RenewDelegationToken, + OpenSession, + CloseSession, + GetInfo, + ExecuteStatement, + GetTypeInfo, + GetCatalogs, + GetSchemas, + GetTables, + GetTableTypes, + GetColumns, + GetFunctions, + GetOperationStatus, + CancelOperation, + CloseOperation, + GetResultSetMetadata, + FetchResults + } + + public static final String AUDIT_FORMAT = + "ugi=%s\t" + // ugi + "ip=%s\t" + // remote IP + "cmd=%s"; // command + public static final Log auditLog = + LogFactory.getLog(ThriftCLIService.class.getName() + ".audit"); + + private static final ThreadLocal auditFormatter = + new ThreadLocal() { + @Override + protected Formatter initialValue() { + return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); + } + }; + public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); this.cliService = cliService; @@ -158,6 +196,33 @@ public InetAddress getServerIPAddress() { return serverIPAddress; } + private void logAuditEvent(String cmd) throws HiveSQLException { + if (cmd == null) { + return; + } + + final Formatter fmt = auditFormatter.get(); + ((StringBuilder) fmt.out()).setLength(0); + + String address = getIpAddress(); + if (address == null) { + address = "unknown-ip-addr"; + } + auditLog.info(fmt.format(AUDIT_FORMAT, getUserName(), address, cmd).toString()); + } + + public void startFunction(ThriftCliFunctions function, String extraLogInfo) + throws HiveSQLException { + logAuditEvent(function + extraLogInfo); + } + + public void startFunction(ThriftCliFunctions function) throws HiveSQLException { + startFunction(function, ""); + } + + public void endFunction(ThriftCliFunctions function) { + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { @@ -167,6 +232,7 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.GetDelegationToken); String token = cliService.getDelegationToken( new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getOwner(), req.getRenewer()); @@ -177,6 +243,8 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) TStatus tokenErrorStatus = HiveSQLException.toTStatus(e); tokenErrorStatus.setSqlState("42000"); resp.setStatus(tokenErrorStatus); + } finally { + endFunction(ThriftCliFunctions.GetDelegationToken); } } return resp; @@ -191,12 +259,15 @@ public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenRe resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.CancelDelegationToken); cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error canceling delegation token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelDelegationToken); } } return resp; @@ -210,12 +281,15 @@ public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq r resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.RenewDelegationToken); cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error obtaining renewing token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.RenewDelegationToken); } } return resp; @@ -233,6 +307,7 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + startFunction(ThriftCliFunctions.OpenSession); SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); // TODO: set real configuration map @@ -241,6 +316,8 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.OpenSession); } return resp; } @@ -274,10 +351,20 @@ private String getIpAddress() { * that the connecting user is trying to proxy for. * This includes a check whether the connecting user is allowed to proxy for the end user. * @param req - * @return + * @return userName * @throws HiveSQLException */ private String getUserName(TOpenSessionReq req) throws HiveSQLException { + String userName = getUserName(); + if (userName == null) { + userName = req.getUsername(); + } + String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); + LOG.debug("Client's username: " + effectiveClientUser); + return effectiveClientUser; + } + + private String getUserName() throws HiveSQLException { String userName = null; // Kerberos if (isKerberosAuthMode()) { @@ -293,14 +380,7 @@ private String getUserName(TOpenSessionReq req) throws HiveSQLException { ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) { userName = SessionManager.getUserName(); } - if (userName == null) { - userName = req.getUsername(); - } - - userName = getShortName(userName); - String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); - LOG.debug("Client's username: " + effectiveClientUser); - return effectiveClientUser; + return userName; } private String getShortName(String userName) { @@ -378,12 +458,15 @@ private TProtocolVersion getMinVersion(TProtocolVersion... versions) { public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { TCloseSessionResp resp = new TCloseSessionResp(); try { + startFunction(ThriftCliFunctions.CloseSession); SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); cliService.closeSession(sessionHandle); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseSession); } return resp; } @@ -392,6 +475,7 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { TGetInfoResp resp = new TGetInfoResp(); try { + startFunction(ThriftCliFunctions.GetInfo); GetInfoValue getInfoValue = cliService.getInfo(new SessionHandle(req.getSessionHandle()), GetInfoType.getGetInfoType(req.getInfoType())); @@ -400,6 +484,8 @@ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetInfo); } return resp; } @@ -410,6 +496,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); + startFunction(ThriftCliFunctions.ExecuteStatement, + "\tstmt={" + statement.replaceAll("[\\r\\n\\t]", "") + "}"); Map confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); OperationHandle operationHandle = runAsync ? @@ -420,6 +508,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T } catch (Exception e) { LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.ExecuteStatement); } return resp; } @@ -428,12 +518,15 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { TGetTypeInfoResp resp = new TGetTypeInfoResp(); try { + startFunction(ThriftCliFunctions.GetTypeInfo); OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting type info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTypeInfo); } return resp; } @@ -442,12 +535,15 @@ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { TGetCatalogsResp resp = new TGetCatalogsResp(); try { + startFunction(ThriftCliFunctions.GetCatalogs); OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting catalogs: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetCatalogs); } return resp; } @@ -456,6 +552,8 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { TGetSchemasResp resp = new TGetSchemasResp(); try { + startFunction(ThriftCliFunctions.GetSchemas, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName()); OperationHandle opHandle = cliService.getSchemas( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName()); resp.setOperationHandle(opHandle.toTOperationHandle()); @@ -463,6 +561,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting schemas: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetSchemas); } return resp; } @@ -471,6 +571,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { public TGetTablesResp GetTables(TGetTablesReq req) throws TException { TGetTablesResp resp = new TGetTablesResp(); try { + startFunction(ThriftCliFunctions.GetTables, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName()); OperationHandle opHandle = cliService .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getTableName(), req.getTableTypes()); @@ -479,6 +581,8 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting tables: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTables); } return resp; } @@ -487,12 +591,15 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException { TGetTableTypesResp resp = new TGetTableTypesResp(); try { + startFunction(ThriftCliFunctions.GetTableTypes); OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting table types: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTableTypes); } return resp; } @@ -501,6 +608,9 @@ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { TGetColumnsResp resp = new TGetColumnsResp(); try { + startFunction(ThriftCliFunctions.GetColumns, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName() + + "\tcolumn=" + req.getColumnName()); OperationHandle opHandle = cliService.getColumns( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), @@ -512,6 +622,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting columns: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetColumns); } return resp; } @@ -520,6 +632,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { TGetFunctionsResp resp = new TGetFunctionsResp(); try { + startFunction(ThriftCliFunctions.GetFunctions, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\tfunction=" + req.getFunctionName()); OperationHandle opHandle = cliService.getFunctions( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getFunctionName()); @@ -528,6 +642,8 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting functions: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetFunctions); } return resp; } @@ -536,6 +652,7 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { + startFunction(ThriftCliFunctions.GetOperationStatus); OperationStatus operationStatus = cliService.getOperationStatus( new OperationHandle(req.getOperationHandle())); resp.setOperationState(operationStatus.getState().toTOperationState()); @@ -549,6 +666,8 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } catch (Exception e) { LOG.warn("Error getting operation status: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetOperationStatus); } return resp; } @@ -557,11 +676,14 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException { TCancelOperationResp resp = new TCancelOperationResp(); try { + startFunction(ThriftCliFunctions.CancelOperation); cliService.cancelOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error cancelling operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelOperation); } return resp; } @@ -570,11 +692,14 @@ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TExc public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException { TCloseOperationResp resp = new TCloseOperationResp(); try { + startFunction(ThriftCliFunctions.CloseOperation); cliService.closeOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseOperation); } return resp; } @@ -584,12 +709,15 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r throws TException { TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); try { + startFunction(ThriftCliFunctions.GetResultSetMetadata); TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); resp.setSchema(schema.toTTableSchema()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting result set metadata: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetResultSetMetadata); } return resp; } @@ -598,6 +726,7 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + startFunction(ThriftCliFunctions.FetchResults); RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), @@ -609,6 +738,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { } catch (Exception e) { LOG.warn("Error fetching results: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.FetchResults); } return resp; }