diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index d09cb248ee..a8629479ec 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -16,6 +16,7 @@ import java.io.IOException; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -50,6 +51,12 @@ public int getPluginPort() { public String getSessionId() { return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID); } + + public int getGuaranteedCount() { + String str = getProperties().get(TezAmRegistryImpl.AM_GUARANTEED_COUNT); + if (!StringUtils.isEmpty(str)) return 0; + return Integer.parseInt(str); + } public String getPluginTokenJobId() { return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index ab02cf44ef..3ff732d9b7 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; + import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,11 +36,12 @@ static final String IPC_TEZCLIENT = "tez-client"; static final String IPC_PLUGIN = "llap-plugin"; static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", - AM_PLUGIN_JOBID = "am.plugin.jobid"; + AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count"; private final static String NAMESPACE_PREFIX = "tez-am-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; + private ServiceRecord srv; public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); @@ -68,8 +70,11 @@ public void populateCache(boolean doInvokeListeners) throws IOException { } public String register(int amPort, int pluginPort, String sessionId, - String serializedToken, String jobIdForToken) throws IOException { - ServiceRecord srv = new ServiceRecord(); + String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException { + if (srv != null) { + throw new UnsupportedOperationException("Already registered with " + srv); + } + srv = new ServiceRecord(); Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint( IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort)); srv.addInternalEndpoint(rpcEndpoint); @@ -83,12 +88,18 @@ public String register(int amPort, int pluginPort, String sessionId, boolean hasToken = serializedToken != null; srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : ""); srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : ""); + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); String uniqueId = registerServiceRecord(srv); LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}", rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath()); return uniqueId; } + public void updateGuaranteed(int guaranteedCount) throws IOException { + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); + updateServiceRecord(srv, false, false); + } + public TezAmInstance getInstance(String name) { Collection instances = getAllInternal(); for(TezAmInstance instance : instances) { diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index e7227a87ad..1015b2d83f 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -98,7 +98,7 @@ private final Set> stateChangeListeners; - private final boolean doCheckAcls; + protected final boolean doCheckAcls; // Secure ZK is only set up by the registering service; anyone can read the registrations. private final String zkPrincipal, zkKeytab, saslLoginContextName; private String userNameFromPrincipal; // Only set when setting up the secure config for ZK. @@ -279,7 +279,7 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti // even under connection or session interruption (will automatically handle retries) znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL, workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); - + // start the creation of znodes znode.start(); @@ -311,7 +311,12 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti return UNIQUE_ID.toString(); } - protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + protected final void updateServiceRecord( + ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException { + if (srv.get(UNIQUE_IDENTIFIER) == null) { + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + } + // waitForInitialCreate must have already been called in registerServiceRecord. try { znode.setData(encoder.toBytes(srv)); @@ -324,11 +329,14 @@ protected final void updateServiceRecord(ServiceRecord srv) throws IOException { } } catch (Exception e) { LOG.error("Unable to update znode with new service record", e); - CloseableUtils.closeQuietly(znode); + if (closeOnFailure) { + CloseableUtils.closeQuietly(znode); + } throw (e instanceof IOException) ? (IOException) e : new IOException(e); } } + final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server try { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index d536341c79..82179645da 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -257,6 +257,16 @@ public void setError(TaskInfo ctx, Throwable t) { private int totalGuaranteed = 0, unusedGuaranteed = 0; + /** + * An internal version to make sure we don't race and overwrite a newer totalGuaranteed count in + * ZK with an older one, without requiring us to make ZK updates under the main writeLock. + * This is updated under writeLock, together with totalGuaranteed. + */ + private long totalGuaranteedVersion = Long.MIN_VALUE; + private final Object registryUpdateLock = new Object(); // The lock for ZK updates. + /** The last totalGuaranteedVersion sent to ZK. Updated under registryUpdateLock. */ + private long tgVersionSent = Long.MIN_VALUE; + private LlapTaskCommunicator communicator; private final int amPort; private final String serializedToken, jobIdForToken; @@ -504,6 +514,7 @@ private static String serializeToken(Token token) { @VisibleForTesting void updateGuaranteedCount(int newTotalGuaranteed) { List toUpdate = null; + long tgVersionForZk; writeLock.lock(); try { // TODO: when this code is a little less hot, change most logs to debug. @@ -514,8 +525,9 @@ void updateGuaranteedCount(int newTotalGuaranteed) { // The "procedural" approach requires that we track the ducks traveling on network, // concurrent terminations, etc. So, while more precise it's much more complex. int delta = newTotalGuaranteed - totalGuaranteed; - WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed - + "; the delta to adjust by is " + delta); + tgVersionForZk = ++totalGuaranteedVersion; + WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + " (internal version " + + tgVersionForZk + "); the delta to adjust by is " + delta); if (delta == 0) return; totalGuaranteed = newTotalGuaranteed; if (metrics != null) { @@ -562,6 +574,7 @@ void updateGuaranteedCount(int newTotalGuaranteed) { } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed); if (toUpdate == null) return; WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks"); for (TaskInfo ti : toUpdate) { @@ -752,7 +765,7 @@ public Void call() throws Exception { amRegistry.start(); int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1; amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), - serializedToken, jobIdForToken); + serializedToken, jobIdForToken, 0); } } finally { writeLock.unlock(); @@ -969,6 +982,7 @@ public void dagComplete() { if (metrics != null) { metrics.incrCompletedDagCount(); } + long tgVersionForZk; writeLock.lock(); try { dagRunning = false; @@ -1002,6 +1016,7 @@ public void dagComplete() { } totalGuaranteed = unusedGuaranteed = 0; + tgVersionForZk = ++totalGuaranteedVersion; if (metrics != null) { metrics.setDagId(null); // We remove the tasks above without state checks so just reset all metrics to 0. @@ -1012,9 +1027,28 @@ public void dagComplete() { } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, 0); // TODO Cleanup pending tasks etc, so that the next dag is not affected. } + private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) { + if (amRegistry == null) return; + synchronized (registryUpdateLock) { + // Make sure the updates are not sent to ZK out of order compared to how we apply them in AM. + if (tgVersionForZk <= tgVersionSent) return; + try { + amRegistry.updateGuaranteed(newTotalGuaranteed); + tgVersionSent = tgVersionForZk; + } catch (IOException ex) { + // Ignore for now. HS2 will probably try to send us the count we already have again. + // We are assuming here that if we can't talk to ZK we will eventually fail. + LOG.error("Failed to update guaranteed count in registry; ignoring", ex); + } + } + } + + + @Override public void blacklistNode(NodeId nodeId) { LOG.info("BlacklistNode not supported"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index 82b38d55b4..a52928cc7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -143,14 +143,19 @@ public void updateSessionsAsync(Double totalMaxAlloc, List session } } - private void updateSessionAsync(final WmTezSession session, final int intAlloc) { - boolean needsUpdate = session.setSendingGuaranteed(intAlloc); - if (!needsUpdate) return; + @Override + public void updateSessionAsync(WmTezSession session) { + updateSessionAsync(session, null); // Resend existing value if necessary. + } + + private void updateSessionAsync(final WmTezSession session, final Integer intAlloc) { + Integer valueToSend = session.setSendingGuaranteed(intAlloc); + if (valueToSend == null) return; // Note: this assumes that the pattern where the same session object is reset with a different // Tez client is not used. It was used a lot in the past but appears to be gone from most // HS2 session pool paths, and this patch removes the last one (reopen). UpdateQueryRequestProto request = UpdateQueryRequestProto - .newBuilder().setGuaranteedTaskCount(intAlloc).build(); + .newBuilder().setGuaranteedTaskCount(valueToSend.intValue()).build(); LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc); amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index a44690283b..9885ce7221 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -27,7 +27,7 @@ void start(); void stop(); /** - * Updates the session allocations asynchoronously. + * Updates the session allocations asynchronously. * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to * avoid various artifacts, esp. with small numbers and double weirdness. * Null means the total is unknown. @@ -39,4 +39,9 @@ * Sets a callback to be invoked on cluster changes relevant to resource allocation. */ void setClusterChangedCallback(Runnable clusterChangedCallback); + + /** + * Updates the session asynchronously with the existing allocation. + */ + void updateSessionAsync(WmTezSession session); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 0962460b8f..89954cba67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -334,10 +334,16 @@ public void onCreate(TezAmInstance si, int ephSeqVersion) { } @Override - public void onUpdate(TezAmInstance serviceInstance, int ephSeqVersion) { - // We currently never update the znode once registered. - // AM recovery will create a new node when it calls register. - LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance); + public void onUpdate(TezAmInstance si, int ephSeqVersion) { + String sessionId = si.getSessionId(); + SessionType session = bySessionId.get(sessionId); + if (session != null) { + LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating [" + + session + "] with an endpoint at " + si.getPluginPort()); + session.updateFromRegistry(si, ephSeqVersion); + } else { + LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring"); + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index d4c3ab9952..1cf5493959 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -106,6 +106,13 @@ public WmTezSession(String sessionId, WorkloadManager parent, @Override void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + updateAmEndpointInfo(si, ephSeqVersion); + if (si != null) { + handleGuaranteedTasksChange(si.getGuaranteedCount()); + } + } + + public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) { AmPluginInfo info = si == null ? null : new AmPluginInfo(si.getHost(), si.getPluginPort(), si.getPluginToken(), si.getPluginTokenJobId()); synchronized (amPluginInfoLock) { @@ -131,6 +138,19 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { } } } + + + private void handleGuaranteedTasksChange(int guaranteedCount) { + boolean doNotify = false; + synchronized (actualState) { + // A noop if we are in process of sending or if we have the correct value. + if (actualState.sending != -1 || actualState.sent == guaranteedCount) return; + actualState.sent = guaranteedCount; + doNotify = actualState.target != guaranteedCount; + } + if (!doNotify) return; + wmParent.notifyOfInconsistentAllocation(this); + } @Override public AmPluginInfo getAmPluginInfo(Ref version) { @@ -161,17 +181,21 @@ public double getClusterFraction() { return this.clusterFraction; } - boolean setSendingGuaranteed(int intAlloc) { - assert intAlloc >= 0; + Integer setSendingGuaranteed(Integer intAlloc) { + assert intAlloc == null || intAlloc >= 0; synchronized (actualState) { - actualState.target = intAlloc; - if (actualState.sending != -1) return false; // The sender will take care of this. - if (actualState.sent == intAlloc) return false; // The value didn't change. + if (intAlloc != null) { + actualState.target = intAlloc; + } else { + intAlloc = actualState.target; + } + if (actualState.sending != -1) return null; // The sender will take care of this. + if (actualState.sent == intAlloc) return null; // The value didn't change. actualState.sending = intAlloc; - return true; + return intAlloc; } } - + public String getAllocationState() { synchronized (actualState) { return "actual/target " + actualState.sent + "/" + actualState.target diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 00e2c20eeb..f0e620c684 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -685,7 +685,6 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork); } - // 12. Save state for future iterations. for (KillQueryContext killCtx : syncWork.toKillQuery.values()) { if (killQueryInProgress.put(killCtx.session, killCtx) != null) { @@ -698,7 +697,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw entry.getValue().endEvent(entry.getKey()); } - // 14. Notify tests and global async ops. + // 14. Give our final state to UI/API requests if any. if (e.dumpStateFuture != null) { List result = new ArrayList<>(); result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); @@ -708,6 +707,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw e.dumpStateFuture.set(result); e.dumpStateFuture = null; } + + // 15. Notify tests and global async ops. if (e.testEvent != null) { e.testEvent.set(true); e.testEvent = null; @@ -1422,6 +1423,13 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { } } + public void notifyOfInconsistentAllocation(WmTezSession session) { + // We just act as a pass-thru between the session and allocation manager. We don't change the + // allocation target (only WM thread can do that); therefore we can do this directly and + // actualState-based sync will take care of multiple potential message senders. + allocationManager.updateSessionAsync(session); + } + public void notifyOfClusterStateChange() { currentLock.lock(); try { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 8d185ba801..20a5947291 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -110,6 +110,10 @@ public void stop() { public void updateSessionsAsync(Double totalMaxAlloc, List sessions) { isCalled = true; } + + @Override + public void updateSessionAsync(WmTezSession session) { + } void assertWasCalledAndReset() { assertTrue(isCalled); diff --git service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 6514d98d19..a6b9b704d1 100644 --- service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -149,9 +149,10 @@ private void addPassiveEndpointToServiceRecord() throws IOException { addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT); } - private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException { + private void addEndpointToServiceRecord( + final ServiceRecord srv, final String endpointName) throws IOException { updateEndpoint(srv, endpointName); - updateServiceRecord(srv); + updateServiceRecord(srv, doCheckAcls, true); } private void updateEndpoint(final ServiceRecord srv, final String endpointName) {