diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0bff243..902efef 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -571,6 +571,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Table alias will be added to column names for queries of type \"select *\" or \n" + "if query explicitly uses table alias \"select r1.x..\"."), + HIVE_VIEW_ACLS("hive.view.acls", null, "view-acls for Hive job, which impact Tez DAG and ATS entry." + + "It can be a user/group, or a comma separated user/group list, or *"), + HIVE_MODIFY_ACLS("hive.modify.acls", null, "modify-acls for Hive job, which impact Tez DAG and ATS entry." + + "It can be a user/group, or a comma separated user/group list, or *"), + // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us // a symbolic name to reference in the Hive source code. Properties with non-null diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 69cbe0b..c8f53e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -431,14 +431,23 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, return dag; } - public static void setAccessControlsForCurrentUser(DAG dag) { + public void setAccessControlsForCurrentUser(DAG dag) { // get current user String currentUser = SessionState.getUserFromAuthenticator(); + // set permissions for DAG + String viewACL = currentUser; + String modifyACL = currentUser; + if (conf.getVar(HiveConf.ConfVars.HIVE_VIEW_ACLS) != null) { + viewACL = conf.getVar(HiveConf.ConfVars.HIVE_VIEW_ACLS); + } + if (conf.getVar(HiveConf.ConfVars.HIVE_MODIFY_ACLS) != null) { + modifyACL = conf.getVar(HiveConf.ConfVars.HIVE_MODIFY_ACLS); + } if(LOG.isDebugEnabled()) { - LOG.debug("Setting Tez DAG access for " + currentUser); + LOG.debug("Setting Tez DAG access to (" + viewACL + "," + modifyACL + ")"); } - // set permissions for current user on DAG - DAGAccessControls ac = new DAGAccessControls(currentUser, currentUser); + + DAGAccessControls ac = new DAGAccessControls(viewACL, modifyACL); dag.setAccessControls(ac); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 55b922b..f2cd94a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -43,7 +43,10 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; @@ -69,6 +72,9 @@ private static TimelineClient timelineClient; private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; + private static final String ATS_DOMAIN_PREFIX = "hive_"; + private static boolean defaultATSDomainCreated = false; + private static final String DEFAULT_ATS_DOMAIN = "hive_default_ats"; private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION, @@ -136,9 +142,57 @@ public ATSHook() { LOG.info("Created ATS Hook"); } + private String createTimelineDomain(HookContext hookContext) throws Exception { + String domainId = ATS_DOMAIN_PREFIX + hookContext.getSessionId(); + String readers, writers; + String requestuser = hookContext.getUserName(); + if (hookContext.getUserName() == null ){ + requestuser = hookContext.getUgi().getUserName() ; + } + if (hookContext.getConf().getVar(HiveConf.ConfVars.HIVE_VIEW_ACLS) != null) { + readers = hookContext.getConf().getVar(HiveConf.ConfVars.HIVE_VIEW_ACLS); + } else { + readers = requestuser; + } + if (hookContext.getConf().getVar(HiveConf.ConfVars.HIVE_MODIFY_ACLS) != null) { + writers = hookContext.getConf().getVar(HiveConf.ConfVars.HIVE_MODIFY_ACLS); + } else { + writers = requestuser; + } + createTimelineDomain(domainId, readers, writers); + return domainId; + } + + private void createTimelineDomain(String domainId, String readers, String writers) throws Exception { + TimelineDomain timelineDomain = new TimelineDomain(); + timelineDomain.setId(domainId); + timelineDomain.setReaders(readers); + timelineDomain.setWriters(writers); + timelineClient.putDomain(timelineDomain); + LOG.info("ATS domain created:" + domainId + "(" + readers + "," + writers + ")"); + } + @Override public void run(final HookContext hookContext) throws Exception { final long currentTime = System.currentTimeMillis(); + String myDomainId; + if (SessionState.get() != null) { + // Create session domain if not present + if (SessionState.get().getATSDomainId() == null) { + SessionState.get().setATSDomainId(createTimelineDomain(hookContext)); + } + myDomainId = SessionState.get().getATSDomainId(); + } else { + // SessionState is null, this is unlikely to happen, just in case + if (!defaultATSDomainCreated) { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + createTimelineDomain(DEFAULT_ATS_DOMAIN, user, user); + defaultATSDomainCreated = true; + } + myDomainId = DEFAULT_ATS_DOMAIN; + } + + final String domainId = myDomainId; final HiveConf conf = new HiveConf(hookContext.getConf()); final QueryState queryState = hookContext.getQueryState(); final String queryId = queryState.getQueryId(); @@ -200,13 +254,13 @@ public void run() { user, requestuser, numMrJobs, numTezJobs, opId, hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, - tablesRead, tablesWritten, conf, llapId)); + tablesRead, tablesWritten, conf, llapId, domainId)); break; case POST_EXEC_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger())); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger(), domainId)); break; case ON_FAILURE_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger())); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger(), domainId)); break; default: //ignore @@ -260,7 +314,8 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, String sessionID, String logID, String threadId, String executionMode, - List tablesRead, List tablesWritten, HiveConf conf, ApplicationId llapAppId) + List tablesRead, List tablesWritten, HiveConf conf, ApplicationId llapAppId, + String domainId) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); @@ -320,12 +375,13 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla if (llapAppId != null) { atsEntity.addOtherInfo(OtherInfoTypes.LLAP_APP_ID.name(), llapAppId.toString()); } + atsEntity.setDomainId(domainId); return atsEntity; } TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, - String opId, PerfLogger perfLogger) throws Exception { + String opId, PerfLogger perfLogger, String domainId) throws Exception { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); @@ -350,6 +406,7 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, S perfObj.put(key, perfLogger.getDuration(key)); } atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), perfObj.toString()); + atsEntity.setDomainId(domainId); return atsEntity; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 3e01e92..1eb71fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -283,6 +283,8 @@ private List forwardedAddresses; + private String atsDomainId; + /** * Get the lineage state stored in this session. * @@ -1243,6 +1245,14 @@ static boolean unregisterJar(List jarsToUnregister) { } } + public String getATSDomainId() { + return atsDomainId; + } + + public void setATSDomainId(String domainId) { + this.atsDomainId = domainId; + } + /** * ResourceType. *