diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index aa94e54..9aea8a3 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -33,10 +33,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; @@ -77,6 +79,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1); + private static Random rand = new Random(); private final LlapProtocolClientProxy communicator; private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; @@ -86,7 +89,10 @@ protected final Token sessionToken; private LlapTaskUmbilicalExternalResponder responder = null; private final long connectionTimeout; + private int attemptNum = 0; private volatile boolean closed = false; + private volatile boolean timeoutsDisabled = false; + private Semaphore requestPermit = new Semaphore(1); private RequestInfo requestInfo; List tezEvents; @@ -198,6 +204,10 @@ public InetSocketAddress getAddress() { * Submit the work for actual execution. */ public void submitWork(SubmitWorkRequestProto request, String llapHost, int llapPort) { + if (!requestPermit.tryAcquire()) { + throw new IllegalStateException("requestPermit already being used for client: " + this.toString()); + } + // Register the pending events to be sent for this spec. VertexOrBinary vob = request.getWorkSpec(); assert vob.hasVertexBinary() != vob.hasVertex(); @@ -231,6 +241,36 @@ private void submitWork() { } } + private void retrySubmission() { + // Retry cases: + // 1) Submission rejected. A taskKilled event will also be received. + // Since both events result in retrySubmission() calls, make sure the request is only retried once. + // 2) Submission received, but taskKilled received before first heartbeat. + // If multiple calls to retry come in from case 1, only allow one retry. + if (!requestPermit.tryAcquire()) { + LOG.debug("Ignoring retry call for " + requestInfo.taskAttemptId); + return; + } + attemptNum++; + + // Don't retry immediately - use delay with exponential backoff + long retryDelay = determineRetryDelay(); + LOG.info("Queueing fragment for resubmission {}, attempt {}, delay {}", + this.requestInfo.taskAttemptId, attemptNum, retryDelay); + disableTimeouts(); // Don't timeout because of retry delay + retryExecutor.schedule( + new Runnable() { + @Override + public void run() { + // Re-enable timeouts + enableTimeouts(); + submitWork(); + } + }, + retryDelay, + TimeUnit.MILLISECONDS); + } + // Helper class to submit fragments to LLAP and retry rejected submissions. static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback { private LlapTaskUmbilicalExternalClient client; @@ -241,6 +281,9 @@ public SubmitWorkCallback(LlapTaskUmbilicalExternalClient client) { @Override public void setResponse(SubmitWorkResponseProto response) { + // Free up requestPermit to allow retries + client.requestPermit.release(); + if (response.hasSubmissionState()) { if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) { String fragmentId = this.client.requestInfo.taskAttemptId; @@ -249,21 +292,7 @@ public void setResponse(SubmitWorkResponseProto response) { // Retry rejected requests if (!client.closed) { - // Update lastHeartbeat so we don't timeout during the retry - client.setLastHeartbeat(System.currentTimeMillis()); - long retryDelay = HiveConf.getTimeVar(client.conf, - HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, - TimeUnit.MILLISECONDS); - LOG.info("Queueing fragment for resubmission: " + fragmentId); - final SubmitWorkCallback submitter = this; - retryExecutor.schedule( - new Runnable() { - @Override - public void run() { - client.submitWork(); - } - }, - retryDelay, TimeUnit.MILLISECONDS); + client.retrySubmission(); } return; } @@ -320,6 +349,32 @@ void setLastHeartbeat(long lastHeartbeat) { this.requestInfo.lastHeartbeat.set(lastHeartbeat); } + private boolean isTimedOut(long currentTime) { + if (timeoutsDisabled) { + return false; + } + return (currentTime - getLastHeartbeat() >= connectionTimeout); + } + + private void enableTimeouts() { + setLastHeartbeat(System.currentTimeMillis()); + timeoutsDisabled = false; + } + + private void disableTimeouts() { + timeoutsDisabled = true; + } + + private long determineRetryDelay() { + // Delay with exponential backoff + long baseDelay = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS); + int maxDelay = (int) Math.min(baseDelay * Math.pow(2, attemptNum), 60000); + long retryDelay = rand.nextInt(maxDelay); + return retryDelay; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -346,7 +401,7 @@ public void run() { for (Map.Entry entry : umbilicalImpl.registeredClients.entrySet()) { LlapTaskUmbilicalExternalClient client = entry.getValue(); - if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) { + if (client.isTimedOut(currentTime)) { timedOutTasks.add(client); } } @@ -491,7 +546,9 @@ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString); if (client != null) { if (client.requestInfo.state == RequestState.PENDING) { - LOG.debug("Ignoring task kill for {}, request is still in pending state", taskAttemptIdString); + // A task kill while the request is still in PENDING state means the request should be retried. + LOG.info("Received task kill for {} which is still in pending state. Retry submission.", taskAttemptIdString); + client.retrySubmission(); } else { try { LOG.error("Task killed - " + taskAttemptIdString);