diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 99c26ce..69b1d5e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3188,6 +3188,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.SECONDS), "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).", "llap.file.cleanup.delay-seconds"), + LLAP_DAEMON_NUM_UMBILICAL_CONNECTIONS_PER_QUERY("hive.llap.daemon.num.umbilical.connections.per.query", + -1, + "Number of parallel connections from a single daemon to the query co-ordinator on the\n" + + "umbilical protocol"), LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", null, "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" + "YARN registry is used.", "llap.daemon.service.hosts"), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 088f07c..2a374df 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; @@ -57,6 +56,8 @@ private final FileSystem localFs; private String[] localDirs; private final LlapNodeId amNodeId; + private final String appTokenIdentifier; + private final UgiPool ugiPool; // Map of states for different vertices. private final Set knownFragments = @@ -66,14 +67,16 @@ private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); private final String tokenUserName, appId; - private final AtomicReference umbilicalUgi; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, - int dagIdentifier, String user, - ConcurrentMap sourceStateMap, - String[] localDirsBase, FileSystem localFs, String tokenUserName, - String tokenAppId, final LlapNodeId amNodeId) { + String dagName, String hiveQueryIdString, + int dagIdentifier, String user, + ConcurrentMap sourceStateMap, + String[] localDirsBase, FileSystem localFs, String tokenUserName, + String tokenAppId, final LlapNodeId amNodeId, + String tokenIdentifier, + Token appToken, + int ugiInstancesPerQuery) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -86,10 +89,15 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag this.localFs = localFs; this.tokenUserName = tokenUserName; this.appId = tokenAppId; - this.umbilicalUgi = new AtomicReference<>(); this.amNodeId = amNodeId; + this.appTokenIdentifier = tokenIdentifier; + InetSocketAddress address = + NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); + SecurityUtil.setTokenService(appToken, address); + this.ugiPool = new UgiPool(ugiInstancesPerQuery, appTokenIdentifier, appToken); } + public QueryIdentifier getQueryIdentifier() { return queryIdentifier; } @@ -314,23 +322,8 @@ public String getTokenAppId() { return appId; } - public void setupUmbilicalUgi(String umbilicalUser, Token appToken, String amHost, int amPort) { - synchronized (umbilicalUgi) { - if (umbilicalUgi.get() == null) { - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(umbilicalUser); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(amHost, amPort); - SecurityUtil.setTokenService(appToken, address); - taskOwner.addToken(appToken); - umbilicalUgi.set(taskOwner); - } - } - } public UserGroupInformation getUmbilicalUgi() { - synchronized (umbilicalUgi) { - return umbilicalUgi.get(); - } + return ugiPool.getUgiInstance(); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 7e646c5..06d9cc0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -75,6 +75,7 @@ private final String clusterId; private final long defaultDeleteDelaySeconds; private final boolean routeBasedLoggingEnabled; + private final int ugiPoolSizePerQuery; // TODO At the moment there's no way of knowing whether a query is running or not. // A race is possible between dagComplete and registerFragment - where the registerFragment @@ -121,6 +122,12 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId this.executorService = Executors.newScheduledThreadPool(numCleanerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build()); + int numExecutors = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + int ugiPoolSizePerQuery = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_UMBILICAL_CONNECTIONS_PER_QUERY); + Preconditions.checkArgument(ugiPoolSizePerQuery == -1 || ugiPoolSizePerQuery > 0, + "ugiPoolSize must be -1, or > 0"); + this.ugiPoolSizePerQuery = ugiPoolSizePerQuery == -1 ? numExecutors : ugiPoolSizePerQuery; + String logger = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_LOGGER); if (logger != null && (logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING))) { routeBasedLoggingEnabled = true; @@ -128,8 +135,8 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId routeBasedLoggingEnabled = false; } LOG.info( - "QueryTracker setup with numCleanerThreads={}, defaultCleanupDelay(s)={}, routeBasedLogging={}", - numCleanerThreads, defaultDeleteDelaySeconds, routeBasedLoggingEnabled); + "QueryTracker setup with numCleanerThreads={}, defaultCleanupDelay(s)={}, routeBasedLogging={}, ugiPoolSizePerQuery={}", + numCleanerThreads, defaultDeleteDelaySeconds, routeBasedLoggingEnabled, ugiPoolSizePerQuery); } /** @@ -169,13 +176,12 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId, amNodeId); + tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken, + ugiPoolSizePerQuery); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; } else { - // Ensure the UGI is setup once. - queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort()); isExistingQueryInfo = false; } } @@ -184,6 +190,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId LlapTokenChecker.checkPermissions(tokenInfo, queryInfo.getTokenUserName(), queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier()); } + // Ensure the Address etc is set up at least once. queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/UgiPool.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/UgiPool.java index e69de29..8e61ab2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/UgiPool.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/UgiPool.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.collect.Lists; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; + +public class UgiPool { + + private final int maxInstances; + private final List instances; + private final String tokenIdentifier; + private final Token token; + + private int createdCount = 0; + private int nextInstance = 0; + + private final ReentrantLock lock = new ReentrantLock(); + + + public UgiPool(int maxInstances, String tokenIdentifier, Token token) { + this.maxInstances = maxInstances; + this.instances = Lists.newArrayListWithCapacity(maxInstances); + this.tokenIdentifier = tokenIdentifier; + this.token = token; + } + + public UserGroupInformation getUgiInstance() { + lock.lock(); + try { + if (createdCount >= maxInstances) { + return instances.get(nextInstance++ % maxInstances); + } else { + createdCount++; + UserGroupInformation ugi = createUgi(); + instances.add(ugi); + return ugi; + } + } finally { + lock.unlock(); + } + } + + private UserGroupInformation createUgi() { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(tokenIdentifier); + ugi.addToken(token); + return ugi; + } +} \ No newline at end of file diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 6287ae8..02a4a60 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; @@ -93,7 +94,7 @@ public static QueryInfo createQueryInfo() { new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), - new String[0], null, "fakeUser", null, null); + new String[0], null, "fakeUser", null, LlapNodeId.getInstance("localhost", 0), null, null, -1); return queryInfo; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestUgiPool.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestUgiPool.java index e69de29..a9ed79e 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestUgiPool.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestUgiPool.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +public class TestUgiPool { + + @Test(timeout = 5000) + public void test() { + + Map ugis = new HashMap<>(); + + UgiPool ugiPool = new UgiPool(10, "fakeIdentifier", mock(Token.class)); + for (int i = 0; i < 5; i++) { + addToMap(ugis, ugiPool.getUgiInstance()); + } + Assert.assertEquals(5, ugis.size()); + + for (int i = 0; i < 5; i++) { + addToMap(ugis, ugiPool.getUgiInstance()); + } + Assert.assertEquals(10, ugis.size()); + + for (int i = 0; i < 5; i++) { + addToMap(ugis, ugiPool.getUgiInstance()); + } + Assert.assertEquals(10, ugis.size()); + + for (int i = 0; i < 5; i++) { + addToMap(ugis, ugiPool.getUgiInstance()); + } + Assert.assertEquals(10, ugis.size()); + + for (Map.Entry entry : ugis.entrySet()) { + Assert.assertEquals(2, entry.getValue().intValue()); + } + } + + private void addToMap(Map map, UserGroupInformation ugi) { + MutableInt count = map.get(ugi); + if (count == null) { + count = new MutableInt(0); + map.put(ugi, count); + } + count.increment(); + } +}