diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index 0724cf5b1f..23b49b380f 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -15,16 +15,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import com.google.common.io.ByteStreams; import org.apache.tez.common.security.JobTokenIdentifier; - import org.apache.hadoop.security.token.Token; import java.io.IOException; + import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -53,6 +53,12 @@ public int getPluginPort() { public String getSessionId() { return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID); } + + public int getGuaranteedCount() { + String str = getProperties().get(TezAmRegistryImpl.AM_GUARANTEED_COUNT); + if (!StringUtils.isEmpty(str)) return 0; + return Integer.parseInt(str); + } public String getPluginTokenJobId() { return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 417e571b8d..d0f1174eef 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; + import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,13 +36,14 @@ static final String IPC_TEZCLIENT = "tez-client"; static final String IPC_PLUGIN = "llap-plugin"; static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", - AM_PLUGIN_JOBID = "am.plugin.jobid"; + AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count"; private final static String NAMESPACE_PREFIX = "tez-am-"; private final static String USER_SCOPE_PATH_PREFIX = "user-"; private static final String WORKER_PREFIX = "worker-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; + private ServiceRecord srv; public static TezAmRegistryImpl create(Configuration conf, boolean b) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); @@ -70,8 +72,11 @@ public void populateCache(boolean doInvokeListeners) throws IOException { } public String register(int amPort, int pluginPort, String sessionId, - String serializedToken, String jobIdForToken) throws IOException { - ServiceRecord srv = new ServiceRecord(); + String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException { + if (srv != null) { + throw new UnsupportedOperationException("Already registered with " + srv); + } + srv = new ServiceRecord(); Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint( IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort)); srv.addInternalEndpoint(rpcEndpoint); @@ -85,12 +90,18 @@ public String register(int amPort, int pluginPort, String sessionId, boolean hasToken = serializedToken != null; srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : ""); srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : ""); + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); String uniqueId = registerServiceRecord(srv); LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}", rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath()); return uniqueId; } + public void updateGuaranteed(int guaranteedCount) throws IOException { + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); + updateServiceRecord(srv); + } + public TezAmInstance getInstance(String name) { Collection instances = getAllInternal(); for(TezAmInstance instance : instances) { diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 17269dddf7..acb9e1c115 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -17,6 +17,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -235,7 +237,7 @@ private String getQuorumServers(Configuration conf) { } protected abstract String getZkPathUser(Configuration conf); - + protected final String registerServiceRecord(ServiceRecord srv) throws IOException { // restart sensitive instance id srv.set(UNIQUE_IDENTIFIER, uniq.toString()); @@ -246,7 +248,7 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti // even under connection or session interruption (will automatically handle retries) znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL, workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); - + // start the creation of znodes znode.start(); @@ -588,4 +590,15 @@ private int extractSeqNum(String nodeName) { throw e; } } + + protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + // waitForInitialCreate must have already been called in registerServiceRecord. + try { + znode.setData(encoder.toBytes(srv)); + } catch (Exception e) { + LOG.error("Unable to update the znode", e); + throw (e instanceof IOException) ? (IOException)e : new IOException(e); + } + } } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b805a..ce36ee3c42 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -247,6 +247,16 @@ public void setError(TaskInfo ctx, Throwable t) { private int totalGuaranteed = 0, unusedGuaranteed = 0; + /** + * An internal version to make sure we don't race and overwrite a newer totalGuaranteed count in + * ZK with an older one, without requiring us to make ZK updates under the main writeLock. + * This is updated under writeLock, together with totalGuaranteed. + */ + private long totalGuaranteedVersion = Long.MIN_VALUE; + private final Object registryUpdateLock = new Object(); // The lock for ZK updates. + /** The last totalGuaranteedVersion sent to ZK. Updated under registryUpdateLock. */ + private long tgVersionSent = Long.MIN_VALUE; + private LlapTaskCommunicator communicator; private final int amPort; private final String serializedToken, jobIdForToken; @@ -494,6 +504,7 @@ private static String serializeToken(Token token) { @VisibleForTesting void updateGuaranteedCount(int newTotalGuaranteed) { List toUpdate = null; + long tgVersionForZk; writeLock.lock(); try { // TODO: when this code is a little less hot, change most logs to debug. @@ -504,6 +515,7 @@ void updateGuaranteedCount(int newTotalGuaranteed) { // The "procedural" approach requires that we track the ducks traveling on network, // concurrent terminations, etc. So, while more precise it's much more complex. int delta = newTotalGuaranteed - totalGuaranteed; + tgVersionForZk = ++totalGuaranteedVersion; WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + "; the delta to adjust by is " + delta); if (delta == 0) return; @@ -552,6 +564,7 @@ void updateGuaranteedCount(int newTotalGuaranteed) { } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed); if (toUpdate == null) return; WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks"); for (TaskInfo ti : toUpdate) { @@ -742,7 +755,7 @@ public Void call() throws Exception { amRegistry.start(); int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1; amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), - serializedToken, jobIdForToken); + serializedToken, jobIdForToken, 0); } } finally { writeLock.unlock(); @@ -959,6 +972,7 @@ public void dagComplete() { if (metrics != null) { metrics.incrCompletedDagCount(); } + long tgVersionForZk; writeLock.lock(); try { dagRunning = false; @@ -992,15 +1006,32 @@ public void dagComplete() { } totalGuaranteed = unusedGuaranteed = 0; + tgVersionForZk = ++totalGuaranteedVersion; LOG.info( "DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", knownTasks.size(), pendingCount, runningCount); } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, 0); // TODO Cleanup pending tasks etc, so that the next dag is not affected. } + private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) { + synchronized (registryUpdateLock) { + // Make sure the updates are not sent to ZK out of order compared to how we apply them in AM. + if (tgVersionForZk <= tgVersionSent) return; + try { + amRegistry.updateGuaranteed(newTotalGuaranteed); + tgVersionSent = tgVersionForZk; + } catch (IOException ex) { + // Ignore for now. HS2 will probably try to send us the count we already have again. + // We are assuming here that if we can't talk to ZK we will eventually fail. + LOG.error("Failed to update guaranteed count in registry; ignoring", ex); + } + } + } + @Override public void blacklistNode(NodeId nodeId) { LOG.info("BlacklistNode not supported"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index 82b38d55b4..a52928cc7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -143,14 +143,19 @@ public void updateSessionsAsync(Double totalMaxAlloc, List session } } - private void updateSessionAsync(final WmTezSession session, final int intAlloc) { - boolean needsUpdate = session.setSendingGuaranteed(intAlloc); - if (!needsUpdate) return; + @Override + public void updateSessionAsync(WmTezSession session) { + updateSessionAsync(session, null); // Resend existing value if necessary. + } + + private void updateSessionAsync(final WmTezSession session, final Integer intAlloc) { + Integer valueToSend = session.setSendingGuaranteed(intAlloc); + if (valueToSend == null) return; // Note: this assumes that the pattern where the same session object is reset with a different // Tez client is not used. It was used a lot in the past but appears to be gone from most // HS2 session pool paths, and this patch removes the last one (reopen). UpdateQueryRequestProto request = UpdateQueryRequestProto - .newBuilder().setGuaranteedTaskCount(intAlloc).build(); + .newBuilder().setGuaranteedTaskCount(valueToSend.intValue()).build(); LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc); amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index a44690283b..9885ce7221 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -27,7 +27,7 @@ void start(); void stop(); /** - * Updates the session allocations asynchoronously. + * Updates the session allocations asynchronously. * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to * avoid various artifacts, esp. with small numbers and double weirdness. * Null means the total is unknown. @@ -39,4 +39,9 @@ * Sets a callback to be invoked on cluster changes relevant to resource allocation. */ void setClusterChangedCallback(Runnable clusterChangedCallback); + + /** + * Updates the session asynchronously with the existing allocation. + */ + void updateSessionAsync(WmTezSession session); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 0962460b8f..89954cba67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -334,10 +334,16 @@ public void onCreate(TezAmInstance si, int ephSeqVersion) { } @Override - public void onUpdate(TezAmInstance serviceInstance, int ephSeqVersion) { - // We currently never update the znode once registered. - // AM recovery will create a new node when it calls register. - LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance); + public void onUpdate(TezAmInstance si, int ephSeqVersion) { + String sessionId = si.getSessionId(); + SessionType session = bySessionId.get(sessionId); + if (session != null) { + LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating [" + + session + "] with an endpoint at " + si.getPluginPort()); + session.updateFromRegistry(si, ephSeqVersion); + } else { + LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring"); + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index d4c3ab9952..1cf5493959 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -106,6 +106,13 @@ public WmTezSession(String sessionId, WorkloadManager parent, @Override void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + updateAmEndpointInfo(si, ephSeqVersion); + if (si != null) { + handleGuaranteedTasksChange(si.getGuaranteedCount()); + } + } + + public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) { AmPluginInfo info = si == null ? null : new AmPluginInfo(si.getHost(), si.getPluginPort(), si.getPluginToken(), si.getPluginTokenJobId()); synchronized (amPluginInfoLock) { @@ -131,6 +138,19 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { } } } + + + private void handleGuaranteedTasksChange(int guaranteedCount) { + boolean doNotify = false; + synchronized (actualState) { + // A noop if we are in process of sending or if we have the correct value. + if (actualState.sending != -1 || actualState.sent == guaranteedCount) return; + actualState.sent = guaranteedCount; + doNotify = actualState.target != guaranteedCount; + } + if (!doNotify) return; + wmParent.notifyOfInconsistentAllocation(this); + } @Override public AmPluginInfo getAmPluginInfo(Ref version) { @@ -161,17 +181,21 @@ public double getClusterFraction() { return this.clusterFraction; } - boolean setSendingGuaranteed(int intAlloc) { - assert intAlloc >= 0; + Integer setSendingGuaranteed(Integer intAlloc) { + assert intAlloc == null || intAlloc >= 0; synchronized (actualState) { - actualState.target = intAlloc; - if (actualState.sending != -1) return false; // The sender will take care of this. - if (actualState.sent == intAlloc) return false; // The value didn't change. + if (intAlloc != null) { + actualState.target = intAlloc; + } else { + intAlloc = actualState.target; + } + if (actualState.sending != -1) return null; // The sender will take care of this. + if (actualState.sent == intAlloc) return null; // The value didn't change. actualState.sending = intAlloc; - return true; + return intAlloc; } } - + public String getAllocationState() { synchronized (actualState) { return "actual/target " + actualState.sent + "/" + actualState.target diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 00e2c20eeb..f0e620c684 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -685,7 +685,6 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork); } - // 12. Save state for future iterations. for (KillQueryContext killCtx : syncWork.toKillQuery.values()) { if (killQueryInProgress.put(killCtx.session, killCtx) != null) { @@ -698,7 +697,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw entry.getValue().endEvent(entry.getKey()); } - // 14. Notify tests and global async ops. + // 14. Give our final state to UI/API requests if any. if (e.dumpStateFuture != null) { List result = new ArrayList<>(); result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); @@ -708,6 +707,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw e.dumpStateFuture.set(result); e.dumpStateFuture = null; } + + // 15. Notify tests and global async ops. if (e.testEvent != null) { e.testEvent.set(true); e.testEvent = null; @@ -1422,6 +1423,13 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { } } + public void notifyOfInconsistentAllocation(WmTezSession session) { + // We just act as a pass-thru between the session and allocation manager. We don't change the + // allocation target (only WM thread can do that); therefore we can do this directly and + // actualState-based sync will take care of multiple potential message senders. + allocationManager.updateSessionAsync(session); + } + public void notifyOfClusterStateChange() { currentLock.lock(); try { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 8d185ba801..20a5947291 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -110,6 +110,10 @@ public void stop() { public void updateSessionsAsync(Double totalMaxAlloc, List sessions) { isCalled = true; } + + @Override + public void updateSessionAsync(WmTezSession session) { + } void assertWasCalledAndReset() { assertTrue(isCalled);