diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 1d5e504..e9814f2 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Formatter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.ServiceUtils; @@ -93,6 +95,43 @@ protected TServerEventHandler serverEventHandler; protected ThreadLocal currentServerContext; + private enum ThriftCliFunctions { + GetDelegationToken, + CancelDelegationToken, + RenewDelegationToken, + OpenSession, + CloseSession, + GetInfo, + ExecuteStatement, + GetTypeInfo, + GetCatalogs, + GetSchemas, + GetTables, + GetTableTypes, + GetColumns, + GetFunctions, + GetOperationStatus, + CancelOperation, + CloseOperation, + GetResultSetMetadata, + FetchResults + } + + public static final String AUDIT_FORMAT = + "ugi=%s\t" + // ugi + "ip=%s\t" + // remote IP + "cmd=%s"; // command + public static final Log auditLog = + LogFactory.getLog(ThriftCLIService.class.getName() + ".audit"); + + private static final ThreadLocal auditFormatter = + new ThreadLocal() { + @Override + protected Formatter initialValue() { + return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); + } + }; + static class ThriftCLIServerContext implements ServerContext { private SessionHandle sessionHandle = null; @@ -242,6 +281,33 @@ public InetAddress getServerIPAddress() { return serverIPAddress; } + private void logAuditEvent(String cmd) throws HiveSQLException { + if (cmd == null) { + return; + } + + final Formatter fmt = auditFormatter.get(); + ((StringBuilder) fmt.out()).setLength(0); + + String address = getIpAddress(); + if (address == null) { + address = "unknown-ip-addr"; + } + auditLog.info(fmt.format(AUDIT_FORMAT, getUserName(), address, cmd).toString()); + } + + public void startFunction(ThriftCliFunctions function, String extraLogInfo) + throws HiveSQLException { + logAuditEvent(function + extraLogInfo); + } + + public void startFunction(ThriftCliFunctions function) throws HiveSQLException { + startFunction(function, ""); + } + + public void endFunction(ThriftCliFunctions function) { + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { @@ -251,6 +317,7 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.GetDelegationToken); String token = cliService.getDelegationToken( new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getOwner(), req.getRenewer()); @@ -261,6 +328,8 @@ public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) TStatus tokenErrorStatus = HiveSQLException.toTStatus(e); tokenErrorStatus.setSqlState("42000"); resp.setStatus(tokenErrorStatus); + } finally { + endFunction(ThriftCliFunctions.GetDelegationToken); } } return resp; @@ -275,12 +344,15 @@ public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenRe resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.CancelDelegationToken); cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error canceling delegation token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelDelegationToken); } } return resp; @@ -294,12 +366,15 @@ public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq r resp.setStatus(unsecureTokenErrorStatus()); } else { try { + startFunction(ThriftCliFunctions.RenewDelegationToken); cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()), hiveAuthFactory, req.getDelegationToken()); resp.setStatus(OK_STATUS); } catch (HiveSQLException e) { LOG.error("Error obtaining renewing token", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.RenewDelegationToken); } } return resp; @@ -317,6 +392,7 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + startFunction(ThriftCliFunctions.OpenSession); SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); // TODO: set real configuration map @@ -331,6 +407,8 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.OpenSession); } return resp; } @@ -364,10 +442,22 @@ private String getIpAddress() { * that the connecting user is trying to proxy for. * This includes a check whether the connecting user is allowed to proxy for the end user. * @param req - * @return + * @return userName * @throws HiveSQLException */ private String getUserName(TOpenSessionReq req) throws HiveSQLException, IOException { + String userName = getUserName(); + if (userName == null) { + userName = req.getUsername(); + } + + userName = getShortName(userName); + String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); + LOG.debug("Client's username: " + effectiveClientUser); + return effectiveClientUser; + } + + private String getUserName() throws HiveSQLException { String userName = null; // Kerberos if (isKerberosAuthMode()) { @@ -383,14 +473,8 @@ private String getUserName(TOpenSessionReq req) throws HiveSQLException, IOExcep ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) { userName = SessionManager.getUserName(); } - if (userName == null) { - userName = req.getUsername(); - } - userName = getShortName(userName); - String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); - LOG.debug("Client's username: " + effectiveClientUser); - return effectiveClientUser; + return userName; } private String getShortName(String userName) throws IOException { @@ -465,6 +549,7 @@ private TProtocolVersion getMinVersion(TProtocolVersion... versions) { public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { TCloseSessionResp resp = new TCloseSessionResp(); try { + startFunction(ThriftCliFunctions.CloseSession); SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); cliService.closeSession(sessionHandle); LOG.info("Closed a session, current sessions: " + sessionCount.decrementAndGet()); @@ -477,6 +562,8 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseSession); } return resp; } @@ -485,6 +572,7 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { TGetInfoResp resp = new TGetInfoResp(); try { + startFunction(ThriftCliFunctions.GetInfo); GetInfoValue getInfoValue = cliService.getInfo(new SessionHandle(req.getSessionHandle()), GetInfoType.getGetInfoType(req.getInfoType())); @@ -493,6 +581,8 @@ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetInfo); } return resp; } @@ -503,6 +593,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); + startFunction(ThriftCliFunctions.ExecuteStatement, + "\tstmt={" + statement.replaceAll("[\\r\\n\\t]", "") + "}"); Map confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); OperationHandle operationHandle = runAsync ? @@ -513,6 +605,8 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T } catch (Exception e) { LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.ExecuteStatement); } return resp; } @@ -521,12 +615,15 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { TGetTypeInfoResp resp = new TGetTypeInfoResp(); try { + startFunction(ThriftCliFunctions.GetTypeInfo); OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting type info: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTypeInfo); } return resp; } @@ -535,12 +632,15 @@ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { TGetCatalogsResp resp = new TGetCatalogsResp(); try { + startFunction(ThriftCliFunctions.GetCatalogs); OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting catalogs: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetCatalogs); } return resp; } @@ -549,6 +649,8 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { TGetSchemasResp resp = new TGetSchemasResp(); try { + startFunction(ThriftCliFunctions.GetSchemas, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName()); OperationHandle opHandle = cliService.getSchemas( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName()); resp.setOperationHandle(opHandle.toTOperationHandle()); @@ -556,6 +658,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting schemas: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetSchemas); } return resp; } @@ -564,6 +668,8 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { public TGetTablesResp GetTables(TGetTablesReq req) throws TException { TGetTablesResp resp = new TGetTablesResp(); try { + startFunction(ThriftCliFunctions.GetTables, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName()); OperationHandle opHandle = cliService .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getTableName(), req.getTableTypes()); @@ -572,6 +678,8 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting tables: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTables); } return resp; } @@ -580,12 +688,15 @@ public TGetTablesResp GetTables(TGetTablesReq req) throws TException { public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException { TGetTableTypesResp resp = new TGetTableTypesResp(); try { + startFunction(ThriftCliFunctions.GetTableTypes); OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle())); resp.setOperationHandle(opHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting table types: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetTableTypes); } return resp; } @@ -594,6 +705,9 @@ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { TGetColumnsResp resp = new TGetColumnsResp(); try { + startFunction(ThriftCliFunctions.GetColumns, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\ttable=" + req.getTableName() + + "\tcolumn=" + req.getColumnName()); OperationHandle opHandle = cliService.getColumns( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), @@ -605,6 +719,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting columns: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetColumns); } return resp; } @@ -613,6 +729,8 @@ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { TGetFunctionsResp resp = new TGetFunctionsResp(); try { + startFunction(ThriftCliFunctions.GetFunctions, "\tcatalog=" + req.getCatalogName() + + "\tschema=" + req.getSchemaName() + "\tfunction=" + req.getFunctionName()); OperationHandle opHandle = cliService.getFunctions( new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName(), req.getFunctionName()); @@ -621,6 +739,8 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { } catch (Exception e) { LOG.warn("Error getting functions: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetFunctions); } return resp; } @@ -629,6 +749,7 @@ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { + startFunction(ThriftCliFunctions.GetOperationStatus); OperationStatus operationStatus = cliService.getOperationStatus( new OperationHandle(req.getOperationHandle())); resp.setOperationState(operationStatus.getState().toTOperationState()); @@ -642,6 +763,8 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } catch (Exception e) { LOG.warn("Error getting operation status: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetOperationStatus); } return resp; } @@ -650,11 +773,14 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException { TCancelOperationResp resp = new TCancelOperationResp(); try { + startFunction(ThriftCliFunctions.CancelOperation); cliService.cancelOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error cancelling operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CancelOperation); } return resp; } @@ -663,11 +789,14 @@ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TExc public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException { TCloseOperationResp resp = new TCloseOperationResp(); try { + startFunction(ThriftCliFunctions.CloseOperation); cliService.closeOperation(new OperationHandle(req.getOperationHandle())); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing operation: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.CloseOperation); } return resp; } @@ -677,12 +806,15 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r throws TException { TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); try { + startFunction(ThriftCliFunctions.GetResultSetMetadata); TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); resp.setSchema(schema.toTTableSchema()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting result set metadata: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.GetResultSetMetadata); } return resp; } @@ -691,6 +823,7 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + startFunction(ThriftCliFunctions.FetchResults); RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), @@ -702,6 +835,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { } catch (Exception e) { LOG.warn("Error fetching results: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + endFunction(ThriftCliFunctions.FetchResults); } return resp; } diff --git service/src/test/org/apache/hive/service/server/TestHiveServer2AuditLogs.java service/src/test/org/apache/hive/service/server/TestHiveServer2AuditLogs.java new file mode 100644 index 0000000..96c6ca6 --- /dev/null +++ service/src/test/org/apache/hive/service/server/TestHiveServer2AuditLogs.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.io.StringWriter; +import java.util.Collections; + +import junit.framework.TestCase; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.SessionHandle; +import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.log4j.WriterAppender; +import org.junit.Assert; + +public class TestHiveServer2AuditLogs extends TestCase { + + private EmbeddedThriftBinaryCLIService service; + private ThriftCLIServiceClient client; + + public void setUp() throws Exception { + service = new EmbeddedThriftBinaryCLIService(); + service.init(new HiveConf()); + client = new ThriftCLIServiceClient(service); + } + + public void tearDown() { + service.stop(); + } + + public void testHS2AuditLogging () throws Exception { + String sessionUserName = "user1"; + + Logger logger = Logger.getLogger(ThriftCLIService.class.getName() + ".audit"); + Level origLevel = logger.getLevel(); + logger.setLevel(Level.INFO); + + // create an appender to capture the logs in a string + StringWriter writer = new StringWriter(); + WriterAppender appender = new WriterAppender(new PatternLayout(), writer); + + try { + logger.addAppender(appender); + + SessionHandle sessionHandle = client.openSession(sessionUserName, "foobar", + Collections.emptyMap()); + client.closeSession(sessionHandle); + + String logStr = writer.toString(); + String expectedString = "OpenSession"; + Assert.assertTrue(logStr + " should contain <" + expectedString, + logStr.contains("cmd=OpenSession")); + Assert.assertTrue(logStr + " should contain <" + expectedString, + logStr.contains("cmd=CloseSession")); + + } finally { + logger.setLevel(origLevel); + logger.removeAppender(appender); + } + } +} \ No newline at end of file