diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 443c371..3554ddb 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Formatter; import java.util.HashMap; import java.util.Map; @@ -65,6 +66,28 @@ protected static HiveAuthFactory hiveAuthFactory; + public enum ThriftCliFunctions { + GetDelegationToken, + CancelDelegationToken, + RenewDelegationToken, + OpenSession, + CloseSession, + GetInfo, + GetTypeInfo, + GetCatalogs, + GetTableTypes, + GetOperationStatus, + CancelOperation, + CloseOperation, + GetResultSetMetadata, + FetchResults, + ExecuteStatement, + GetSchemas, + GetTables, + GetColumns, + GetFunctions + } + public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); this.cliService = cliService; @@ -105,6 +128,46 @@ public synchronized void stop() { super.stop(); } + public static final String AUDIT_FORMAT = + "ugi=%s\t" + // ugi + "ip=%s\t" + // remote IP + "cmd=%s"; // command + public static final Log auditLog = LogFactory.getLog(ThriftCLIService.class.getName() + ".audit"); + + private static final ThreadLocal auditFormatter = + new ThreadLocal() { + @Override + protected Formatter initialValue() { + return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); + } + }; + + private final void logAuditEvent(String cmd) { + if (cmd == null) { + return; + } + + final Formatter fmt = auditFormatter.get(); + ((StringBuilder) fmt.out()).setLength(0); + + String address = getIpAddress(); + if (address == null) { + address = "unknown-ip-addr"; + } + auditLog.info(fmt.format(AUDIT_FORMAT, getUserName(), address, cmd).toString()); + } + + public void startFunction(ThriftCliFunctions function, String extraLogInfo) { + logAuditEvent(function + extraLogInfo); + } + + public void startFunction(ThriftCliFunctions function) { + startFunction(function, ""); + } + + public void endFunction(ThriftCliFunctions function) { + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { @@ -180,6 +243,7 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + startFunction(ThriftCliFunctions.OpenSession); SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); // TODO: set real configuration map @@ -188,6 +252,8 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.OpenSession); } return resp; } @@ -215,6 +281,17 @@ private String getIpAddress() { } private String getUserName(TOpenSessionReq req) throws HiveSQLException { + String userName = getUserName(); + if (userName == null) { + userName = req.getUsername(); + } + String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); + LOG.debug("Client's username: " + effectiveClientUser); + return effectiveClientUser; + } + + private String getUserName() { + String userName = null; // Kerberos if (isKerberosAuthMode()) { @@ -230,12 +307,7 @@ private String getUserName(TOpenSessionReq req) throws HiveSQLException { ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) { userName = SessionManager.getUserName(); } - if (userName == null) { - userName = req.getUsername(); - } - String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); - LOG.debug("Client's username: " + effectiveClientUser); - return effectiveClientUser; + return userName; } /** @@ -302,12 +374,15 @@ private TProtocolVersion getMinVersion(TProtocolVersion... versions) { public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { TCloseSessionResp resp = new TCloseSessionResp(); try { + startFunction(ThriftCliFunctions.CloseSession); SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); cliService.closeSession(sessionHandle); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseSession); } return resp; } @@ -316,6 +391,7 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { TGetInfoResp resp = new TGetInfoResp(); try { + startFunction(ThriftCliFunctions.GetInfo); GetInfoValue getInfoValue = cliService.getInfo(new SessionHandle(req.getSessionHandle()), GetInfoType.getGetInfoType(req.getInfoType())); @@ -324,6 +400,8 @@ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetInfo); } return resp; } @@ -334,6 +412,7 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); + startFunction(ThriftCliFunctions.ExecuteStatement, "\tstmt={" + statement.replaceAll("[\\r\\n\\t]", "") + "}"); Map confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); OperationHandle operationHandle = runAsync ? @@ -344,6 +423,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T } catch (Exception e) { LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.ExecuteStatement); } return resp; } @@ -352,12 +433,15 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { TGetTypeInfoResp resp = new TGetTypeInfoResp(); try { + startFunction(ThriftCliFunctions.GetTypeInfo); OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting type info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTypeInfo); } return resp; } @@ -366,12 +450,15 @@ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { TGetCatalogsResp resp = new TGetCatalogsResp(); try { + startFunction(ThriftCliFunctions.GetCatalogs); OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting catalogs: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetCatalogs); } return resp; } @@ -380,6 +467,7 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { TGetSchemasResp resp = new TGetSchemasResp(); try { + startFunction(ThriftCliFunctions.GetSchemas, "\tcatalog=" + req.getCatalogName() + "\tschema=" + req.getSchemaName()); OperationHandle opHandle = cliService.getSchemas( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName()); resp.setOperationHandle(opHandle.toTOperationHandle()); @@ -387,6 +475,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting schemas: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetSchemas); } return resp; } @@ -395,6 +485,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { public TGetTablesResp GetTables(TGetTablesReq req) throws TException { TGetTablesResp resp = new TGetTablesResp(); try { + startFunction(ThriftCliFunctions.GetTables, "\tcatalog=" + req.getCatalogName() + "\tschema=" + req.getSchemaName() + + "\ttable=" + req.getTableName()); OperationHandle opHandle = cliService .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getTableName(), req.getTableTypes()); @@ -403,6 +495,8 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting tables: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTables); } return resp; } @@ -411,12 +505,15 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException { TGetTableTypesResp resp = new TGetTableTypesResp(); try { + startFunction(ThriftCliFunctions.GetTableTypes); OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting table types: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTableTypes); } return resp; } @@ -425,6 +522,8 @@ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { TGetColumnsResp resp = new TGetColumnsResp(); try { + startFunction(ThriftCliFunctions.GetColumns, "\tcatalog=" + req.getCatalogName() + "\tschema=" + req.getSchemaName() + + "\ttable=" + req.getTableName() + "\tcolumn=" + req.getColumnName()); OperationHandle opHandle = cliService.getColumns( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), @@ -436,6 +535,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting columns: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetColumns); } return resp; } @@ -444,6 +545,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { TGetFunctionsResp resp = new TGetFunctionsResp(); try { + startFunction(ThriftCliFunctions.GetFunctions, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\tfunction=" + req.getFunctionName()); OperationHandle opHandle = cliService.getFunctions( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getFunctionName()); @@ -452,6 +555,8 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting functions: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetFunctions); } return resp; } @@ -460,6 +565,7 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { + startFunction(ThriftCliFunctions.GetOperationStatus); OperationStatus operationStatus = cliService.getOperationStatus( new OperationHandle(req.getOperationHandle())); resp.setOperationState(operationStatus.getState().toTOperationState()); @@ -473,6 +579,8 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } catch (Exception e) { LOG.warn("Error getting operation status: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetOperationStatus); } return resp; } @@ -481,11 +589,14 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException { TCancelOperationResp resp = new TCancelOperationResp(); try { + startFunction(ThriftCliFunctions.CancelOperation); cliService.cancelOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error cancelling operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelOperation); } return resp; } @@ -494,11 +605,14 @@ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TExc public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException { TCloseOperationResp resp = new TCloseOperationResp(); try { + startFunction(ThriftCliFunctions.CloseOperation); cliService.closeOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseOperation); } return resp; } @@ -508,12 +622,15 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r throws TException { TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); try { + startFunction(ThriftCliFunctions.GetResultSetMetadata); TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); resp.setSchema(schema.toTTableSchema()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting result set metadata: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetResultSetMetadata); } return resp; } @@ -522,6 +639,7 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + startFunction(ThriftCliFunctions.FetchResults); RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), @@ -533,6 +651,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { } catch (Exception e) { LOG.warn("Error fetching results: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.FetchResults); } return resp; }