diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 13ccd93..71e7426 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; @@ -43,12 +44,16 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.dag.api.TezConfiguration; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +74,9 @@ private static TimelineClient timelineClient; private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; + private static final String ATS_DOMAIN_PREFIX = "hive_"; + private static boolean defaultATSDomainCreated = false; + private static final String DEFAULT_ATS_DOMAIN = "hive_default_ats"; private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION, @@ -136,9 +144,79 @@ public ATSHook() { LOG.info("Created ATS Hook"); } + private void createTimelineDomain(String domainId, String readers, String writers) throws Exception { + TimelineDomain timelineDomain = new TimelineDomain(); + timelineDomain.setId(domainId); + timelineDomain.setReaders(readers); + timelineDomain.setWriters(writers); + timelineClient.putDomain(timelineDomain); + LOG.info("ATS domain created:" + domainId + "(" + readers + "," + writers + ")"); + } + + private String createOrGetDomain(final HookContext hookContext) throws Exception { + final String domainId; + String domainReaders = null; + String domainWriters = null; + boolean create = false; + if (SessionState.get() != null) { + if (SessionState.get().getATSDomainId() == null) { + domainId = ATS_DOMAIN_PREFIX + hookContext.getSessionId(); + // Create session domain if not present + if (SessionState.get().getATSDomainId() == null) { + String requestuser = hookContext.getUserName(); + if (hookContext.getUserName() == null ){ + requestuser = hookContext.getUgi().getShortUserName() ; + } + boolean addHs2User = + HiveConf.getBoolVar(hookContext.getConf(), ConfVars.HIVETEZHS2USERACCESS); + + UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser(); + String loginUser = + loginUserUgi == null ? null : loginUserUgi.getShortUserName(); + + // In Tez, TEZ_AM_VIEW_ACLS/TEZ_AM_MODIFY_ACLS is used as the base for Tez ATS ACLS, + // so if exists, honor it. So we get the same ACLS for Tez ATS entries and + // Hive entries + domainReaders = Utilities.getAclStringWithHiveModification(hookContext.getConf(), + TezConfiguration.TEZ_AM_VIEW_ACLS, addHs2User, requestuser, loginUser); + + domainWriters = Utilities.getAclStringWithHiveModification(hookContext.getConf(), + TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, requestuser, loginUser); + SessionState.get().setATSDomainId(domainId); + create = true; + } + } else { + domainId = SessionState.get().getATSDomainId(); + } + } else { + // SessionState is null, this is unlikely to happen, just in case + if (!defaultATSDomainCreated) { + domainReaders = domainWriters = UserGroupInformation.getCurrentUser().getShortUserName(); + defaultATSDomainCreated = true; + create = true; + } + domainId = DEFAULT_ATS_DOMAIN; + } + if (create) { + final String readers = domainReaders; + final String writers = domainWriters; + executor.submit(new Runnable() { + @Override + public void run() { + try { + createTimelineDomain(domainId, readers, writers); + } catch (Exception e) { + LOG.warn("Failed to create ATS domain " + domainId, e); + } + } + }); + } + return domainId; + } @Override public void run(final HookContext hookContext) throws Exception { final long currentTime = System.currentTimeMillis(); + final HiveConf conf = new HiveConf(hookContext.getConf()); final QueryState queryState = hookContext.getQueryState(); final String queryId = queryState.getQueryId(); @@ -151,6 +229,7 @@ public void run(final HookContext hookContext) throws Exception { try { setupAtsExecutor(conf); + final String domainId = createOrGetDomain(hookContext); executor.submit(new Runnable() { @Override public void run() { @@ -205,13 +284,13 @@ public void run() { user, requestuser, numMrJobs, numTezJobs, opId, hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, - tablesRead, tablesWritten, conf, llapId)); + tablesRead, tablesWritten, conf, llapId, domainId)); break; case POST_EXEC_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, durations)); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, durations, domainId)); break; case ON_FAILURE_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, durations)); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, durations, domainId)); break; default: //ignore @@ -265,7 +344,8 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, String sessionID, String logID, String threadId, String executionMode, - List tablesRead, List tablesWritten, HiveConf conf, ApplicationId llapAppId) + List tablesRead, List tablesWritten, HiveConf conf, ApplicationId llapAppId, + String domainId) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); @@ -325,12 +405,13 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla if (llapAppId != null) { atsEntity.addOtherInfo(OtherInfoTypes.LLAP_APP_ID.name(), llapAppId.toString()); } + atsEntity.setDomainId(domainId); return atsEntity; } TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, - String opId, Map durations) throws Exception { + String opId, Map durations, String domainId) throws Exception { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); @@ -355,6 +436,7 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, S perfObj.put(entry.getKey(), entry.getValue()); } atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), perfObj.toString()); + atsEntity.setDomainId(domainId); return atsEntity; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index ba2c9c3..1e9774f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -283,6 +283,8 @@ private List forwardedAddresses; + private String atsDomainId; + /** * Get the lineage state stored in this session. * @@ -1243,6 +1245,14 @@ static boolean unregisterJar(List jarsToUnregister) { } } + public String getATSDomainId() { + return atsDomainId; + } + + public void setATSDomainId(String domainId) { + this.atsDomainId = domainId; + } + /** * ResourceType. *