From ef99c5f1f136815304cab57c0b40ebdf931d0fdd Mon Sep 17 00:00:00 2001 From: SimhadriG Date: Wed, 9 Dec 2020 02:01:43 +0530 Subject: [PATCH 1/2] HIVE-24497: Node heartbeats from LLAP Daemon to the client are not matching leading to timeout in cloud deplyoment --- .../hive/llap/daemon/impl/AMReporter.java | 28 +++++++++++++++---- .../llap/daemon/impl/TaskRunnerCallable.java | 2 +- .../impl/comparator/TestAMReporter.java | 4 +-- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 088a5f33c0f..ce3e2f0e7ac 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; @@ -192,9 +193,9 @@ public void serviceStop() { } } - public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser, - Token jobToken, QueryIdentifier queryIdentifier, - TezTaskAttemptID attemptId, boolean isGuaranteed) { + public AMNodeInfo registerTask(boolean externalClientRequest, String amLocation, int port, String umbilicalUser, + Token jobToken, QueryIdentifier queryIdentifier, + TezTaskAttemptID attemptId, boolean isGuaranteed) { if (LOG.isTraceEnabled()) { LOG.trace( "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}", @@ -214,6 +215,7 @@ public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); + amNodeInfo.setIsExtCliRequest(externalClientRequest); amNodeInfoPerQuery.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. @@ -410,8 +412,15 @@ protected Void callInternal() { BooleanArray guaranteed = new BooleanArray(); guaranteed.set(tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()])); - amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), - new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw, guaranteed); + if (LlapUtil.isCloudDeployment(conf) && amNodeInfo.isExtCliRequest()) { + String hostname = amNodeInfo.amNodeId.getHostname(); + int externalClientCloudRpcPort = amNodeInfo.amNodeId.getPort(); + amNodeInfo.getUmbilical().nodeHeartbeat(new Text(hostname), + new Text(daemonId.getUniqueNodeIdInCluster()), externalClientCloudRpcPort, aw, guaranteed); + } else { + amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), + new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw, guaranteed); + } } catch (IOException e) { QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); @@ -470,6 +479,7 @@ private AMNodeInfo getAMNodeInfo(String amHost, int amPort, QueryIdentifier quer private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; private final AtomicBoolean isDone = new AtomicBoolean(false); + private final AtomicBoolean isExtCliRequest = new AtomicBoolean(false); public AMNodeInfo(LlapNodeId amNodeId, String umbilicalUser, @@ -541,6 +551,14 @@ boolean isDone() { return isDone.get(); } + void setIsExtCliRequest(boolean val) { + isExtCliRequest.set(val); + } + + boolean isExtCliRequest() { + return isExtCliRequest.get(); + } + /** * @return A snapshot of the tasks running at this daemon from this AM. * Doesn't have to be consistent between multiple tasks; whether some task makes it into diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index bc26dc04752..99f226c799d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -160,7 +160,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.amReporter = amReporter; // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (amReporter != null && jobToken != null) { - this.amNodeInfo = amReporter.registerTask(request.getAmHost(), request.getAmPort(), + this.amNodeInfo = amReporter.registerTask(request.getIsExternalClientRequest(), request.getAmHost(), request.getAmPort(), vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId, isGuaranteed); } else { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java index 33c5439985a..47f32750fee 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java @@ -71,9 +71,9 @@ public void testMultipleAM() throws InterruptedException { String am2Location = "am2"; String umbilicalUser = "user"; QueryIdentifier queryId = new QueryIdentifier("app", 0); - amReporter.registerTask(am1Location, am1Port, umbilicalUser, null, queryId, + amReporter.registerTask(false,am1Location, am1Port, umbilicalUser, null, queryId, mock(TezTaskAttemptID.class), false); - amReporter.registerTask(am2Location, am2Port, umbilicalUser, null, queryId, + amReporter.registerTask(false,am2Location, am2Port, umbilicalUser, null, queryId, mock(TezTaskAttemptID.class), false); Thread.currentThread().sleep(2000); From a4dbd155dd85e113f5c2b75bace256f8798ce7ab Mon Sep 17 00:00:00 2001 From: SimhadriG Date: Wed, 9 Dec 2020 13:20:52 +0530 Subject: [PATCH 2/2] HIVE-24497: Addressed review comment: Renamed the variables and methods to isExternalClientRequest. --- .../hadoop/hive/llap/daemon/impl/AMReporter.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index ce3e2f0e7ac..86a91af1e69 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -215,7 +215,7 @@ public AMNodeInfo registerTask(boolean externalClientRequest, String amLocation, if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); - amNodeInfo.setIsExtCliRequest(externalClientRequest); + amNodeInfo.setIsExternalClientRequest(externalClientRequest); amNodeInfoPerQuery.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. @@ -412,7 +412,7 @@ protected Void callInternal() { BooleanArray guaranteed = new BooleanArray(); guaranteed.set(tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()])); - if (LlapUtil.isCloudDeployment(conf) && amNodeInfo.isExtCliRequest()) { + if (LlapUtil.isCloudDeployment(conf) && amNodeInfo.isExternalClientRequest()) { String hostname = amNodeInfo.amNodeId.getHostname(); int externalClientCloudRpcPort = amNodeInfo.amNodeId.getPort(); amNodeInfo.getUmbilical().nodeHeartbeat(new Text(hostname), @@ -479,7 +479,7 @@ private AMNodeInfo getAMNodeInfo(String amHost, int amPort, QueryIdentifier quer private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; private final AtomicBoolean isDone = new AtomicBoolean(false); - private final AtomicBoolean isExtCliRequest = new AtomicBoolean(false); + private final AtomicBoolean isExternalClientRequest = new AtomicBoolean(false); public AMNodeInfo(LlapNodeId amNodeId, String umbilicalUser, @@ -551,12 +551,12 @@ boolean isDone() { return isDone.get(); } - void setIsExtCliRequest(boolean val) { - isExtCliRequest.set(val); + void setIsExternalClientRequest(boolean val) { + isExternalClientRequest.set(val); } - boolean isExtCliRequest() { - return isExtCliRequest.get(); + boolean isExternalClientRequest() { + return isExternalClientRequest.get(); } /**