diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index aa94e54..d97b156 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -78,6 +79,7 @@ private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1); + private final Random rand = new Random(); private final LlapProtocolClientProxy communicator; private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; private final Configuration conf; @@ -86,7 +88,10 @@ protected final Token sessionToken; private LlapTaskUmbilicalExternalResponder responder = null; private final long connectionTimeout; + private long baseDelay; + private int attemptNum = 0; private volatile boolean closed = false; + private volatile boolean timeoutsDisabled = false; private RequestInfo requestInfo; List tezEvents; @@ -156,6 +161,9 @@ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifie this.responder = responder; this.connectionTimeout = 3 * HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + this.baseDelay = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS); // Add support for configurable threads, however 1 should always be enough. this.communicator = new LlapProtocolClientProxy(1, conf, llapToken); this.communicator.init(conf); @@ -231,6 +239,27 @@ private void submitWork() { } } + private void retrySubmission() { + attemptNum++; + + // Don't retry immediately - use delay with exponential backoff + long retryDelay = determineRetryDelay(); + LOG.info("Queueing fragment for resubmission {}, attempt {}, delay {}", + this.requestInfo.taskAttemptId, attemptNum, retryDelay); + disableTimeouts(); // Don't timeout because of retry delay + retryExecutor.schedule( + new Runnable() { + @Override + public void run() { + // Re-enable timeouts + enableTimeouts(); + submitWork(); + } + }, + retryDelay, + TimeUnit.MILLISECONDS); + } + // Helper class to submit fragments to LLAP and retry rejected submissions. static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback { private LlapTaskUmbilicalExternalClient client; @@ -247,24 +276,9 @@ public void setResponse(SubmitWorkResponseProto response) { String msg = "Fragment: " + fragmentId + " rejected. Server Busy."; LOG.info(msg); - // Retry rejected requests - if (!client.closed) { - // Update lastHeartbeat so we don't timeout during the retry - client.setLastHeartbeat(System.currentTimeMillis()); - long retryDelay = HiveConf.getTimeVar(client.conf, - HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, - TimeUnit.MILLISECONDS); - LOG.info("Queueing fragment for resubmission: " + fragmentId); - final SubmitWorkCallback submitter = this; - retryExecutor.schedule( - new Runnable() { - @Override - public void run() { - client.submitWork(); - } - }, - retryDelay, TimeUnit.MILLISECONDS); - } + // taskKill() should also be received during a rejected submission, + // we will let that logic handle retries. + return; } } @@ -320,6 +334,29 @@ void setLastHeartbeat(long lastHeartbeat) { this.requestInfo.lastHeartbeat.set(lastHeartbeat); } + private boolean isTimedOut(long currentTime) { + if (timeoutsDisabled) { + return false; + } + return (currentTime - getLastHeartbeat() >= connectionTimeout); + } + + private void enableTimeouts() { + setLastHeartbeat(System.currentTimeMillis()); + timeoutsDisabled = false; + } + + private void disableTimeouts() { + timeoutsDisabled = true; + } + + private long determineRetryDelay() { + // Delay with exponential backoff + int maxDelay = (int) Math.min(baseDelay * Math.pow(2, attemptNum), 60000); + long retryDelay = rand.nextInt(maxDelay); + return retryDelay; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -346,7 +383,7 @@ public void run() { for (Map.Entry entry : umbilicalImpl.registeredClients.entrySet()) { LlapTaskUmbilicalExternalClient client = entry.getValue(); - if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) { + if (client.isTimedOut(currentTime)) { timedOutTasks.add(client); } } @@ -491,7 +528,9 @@ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString); if (client != null) { if (client.requestInfo.state == RequestState.PENDING) { - LOG.debug("Ignoring task kill for {}, request is still in pending state", taskAttemptIdString); + // A task kill while the request is still in PENDING state means the request should be retried. + LOG.info("Received task kill for {} which is still in pending state. Retry submission.", taskAttemptIdString); + client.retrySubmission(); } else { try { LOG.error("Task killed - " + taskAttemptIdString);