diff --git llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index d97b1567ea..945474f540 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -513,11 +513,12 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce } @Override - public void nodeHeartbeat( - Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException { + public void nodeHeartbeat(Text hostname, Text uniqueId, int port, TezAttemptArray aw, + BooleanArray guaranteed) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId); } + // External client currently cannot use guaranteed. updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw); // No need to propagate to this to the responder } diff --git llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java index a2dca1b319..14c8b50c6b 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.protocol; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; import java.io.IOException; @@ -37,6 +38,12 @@ public TezAttemptArray() { } } + public class BooleanArray extends ArrayWritable { + public BooleanArray() { + super(BooleanWritable.class); + } + } + public static final long versionID = 1L; // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers @@ -44,8 +51,8 @@ public TezAttemptArray() { public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException; - public void nodeHeartbeat( - Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException; + public void nodeHeartbeat(Text hostname, Text uniqueId, int port, + TezAttemptArray aw, BooleanArray guaranteed) throws IOException; public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index b784360b6c..bc70dfd46c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -14,18 +14,14 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; - import java.util.ArrayList; - import java.util.List; - import java.util.HashSet; - import java.util.Set; - import javax.net.SocketFactory; import java.io.IOException; @@ -34,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; @@ -58,6 +55,7 @@ import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -194,15 +192,14 @@ public void serviceStop() { } } - public void registerTask(String amLocation, int port, String umbilicalUser, + public AMNodeInfo registerTask(String amLocation, int port, String umbilicalUser, Token jobToken, QueryIdentifier queryIdentifier, - TezTaskAttemptID attemptId) { + TezTaskAttemptID attemptId, boolean isGuaranteed) { if (LOG.isTraceEnabled()) { LOG.trace( "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}", (amLocation + ":" + port), queryIdentifier, attemptId); } - AMNodeInfo amNodeInfo; // Since we don't have an explicit AM end signal yet - we're going to create // and discard AMNodeInfo instances per query. @@ -213,7 +210,7 @@ public void registerTask(String amLocation, int port, String umbilicalUser, amNodeInfoPerQuery = new HashMap<>(); knownAppMasters.put(queryIdentifier, amNodeInfoPerQuery); } - amNodeInfo = amNodeInfoPerQuery.get(amNodeId); + AMNodeInfo amNodeInfo = amNodeInfoPerQuery.get(amNodeId); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); @@ -227,7 +224,8 @@ public void registerTask(String amLocation, int port, String umbilicalUser, // A single queueLookupCallable is added here. We have to make sure one instance stays // in the queue till the query completes. } - amNodeInfo.addTaskAttempt(attemptId); + amNodeInfo.addTaskAttempt(attemptId, isGuaranteed); + return amNodeInfo; } } @@ -398,18 +396,22 @@ protected Void callInternal() { if (LOG.isTraceEnabled()) { LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo); } - List tasks = amNodeInfo.getTasksSnapshot(); - if (tasks.isEmpty()) { + TaskSnapshot tasks = amNodeInfo.getTasksSnapshot(); + if (tasks.attempts.isEmpty()) { return null; } try { if (LOG.isTraceEnabled()) { LOG.trace("NodeHeartbeat to: " + amNodeInfo); } + // TODO: if there are more fields perhaps there should be an array of class. TezAttemptArray aw = new TezAttemptArray(); - aw.set(tasks.toArray(new TezTaskAttemptID[tasks.size()])); + aw.set(tasks.attempts.toArray(new TezTaskAttemptID[tasks.attempts.size()])); + BooleanArray guaranteed = new BooleanArray(); + guaranteed.set(tasks.guaranteed.toArray(new BooleanWritable[tasks.guaranteed.size()])); + amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), - new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw); + new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw, guaranteed); } catch (IOException e) { QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); @@ -455,7 +457,7 @@ private AMNodeInfo getAMNodeInfo(String amHost, int amPort, QueryIdentifier quer protected class AMNodeInfo implements Delayed { // Serves as lock for itself. - private final Set tasks = new HashSet<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); private final String umbilicalUser; private final Token jobToken; private final Configuration conf; @@ -501,21 +503,25 @@ synchronized void stopUmbilical() { umbilical = null; } - int addTaskAttempt(TezTaskAttemptID attemptId) { - synchronized (tasks) { - if (!tasks.add(attemptId)) { - throw new RuntimeException(attemptId + " was already registered"); - } - return tasks.size(); + void addTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) { + Boolean oldVal = tasks.putIfAbsent(attemptId, isGuaranteed); + if (oldVal != null) { + throw new RuntimeException(attemptId + " was already registered"); } } - int removeTaskAttempt(TezTaskAttemptID attemptId) { - synchronized (tasks) { - if (!tasks.remove(attemptId)) { - throw new RuntimeException(attemptId + " was not registered and couldn't be removed"); - } - return tasks.size(); + void updateTaskAttempt(TezTaskAttemptID attemptId, boolean isGuaranteed) { + Boolean oldVal = tasks.replace(attemptId, isGuaranteed); + if (oldVal == null) { + LOG.warn("Task " + attemptId + " is no longer registered"); + tasks.remove(attemptId); + } + } + + void removeTaskAttempt(TezTaskAttemptID attemptId) { + Boolean oldVal = tasks.remove(attemptId); + if (oldVal == null) { + throw new RuntimeException(attemptId + " was not registered and couldn't be removed"); } } @@ -535,10 +541,12 @@ boolean isDone() { return isDone.get(); } - List getTasksSnapshot() { - List result = new ArrayList<>(); - synchronized (tasks) { - result.addAll(tasks); + + TaskSnapshot getTasksSnapshot() { + TaskSnapshot result = new TaskSnapshot(tasks.size()); + for (Map.Entry e : tasks.entrySet()) { + result.attempts.add(e.getKey()); + result.guaranteed.add(new BooleanWritable(e.getValue())); } return result; } @@ -579,4 +587,14 @@ private int getTaskCount() { } } } + + + private static final class TaskSnapshot { + public TaskSnapshot(int count) { + attempts = new ArrayList<>(count); + guaranteed = new ArrayList<>(count); + } + public final List attempts; + public final List guaranteed; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 75d8577d6a..b484a13e48 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; +import org.apache.hadoop.hive.llap.daemon.impl.AMReporter.AMNodeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -130,6 +131,7 @@ private final SocketFactory socketFactory; private boolean isGuaranteed; private WmFragmentCounters wmCounters; + private final AMNodeInfo amNodeInfo; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, @@ -156,8 +158,11 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.amReporter = amReporter; // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (amReporter != null && jobToken != null) { - this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId); + this.amNodeInfo = amReporter.registerTask(request.getAmHost(), request.getAmPort(), + vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), + attemptId, isGuaranteed); + } else { + this.amNodeInfo = null; } this.metrics = metrics; this.requestId = taskSpec.getTaskAttemptID().toString(); @@ -611,6 +616,9 @@ public boolean isGuaranteed() { public void setIsGuaranteed(boolean isGuaranteed) { this.isGuaranteed = isGuaranteed; + if (amNodeInfo != null) { + amNodeInfo.updateTaskAttempt(taskSpec.getTaskAttemptID(), isGuaranteed); + } if (wmCounters != null) { wmCounters.changeGuaranteed(isGuaranteed); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java index bde354649a..19f804882a 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestAMReporter.java @@ -69,9 +69,9 @@ public void testMultipleAM() throws InterruptedException { String umbilicalUser = "user"; QueryIdentifier queryId = new QueryIdentifier("app", 0); amReporter.registerTask(am1Location, am1Port, umbilicalUser, null, queryId, - mock(TezTaskAttemptID.class)); + mock(TezTaskAttemptID.class), false); amReporter.registerTask(am2Location, am2Port, umbilicalUser, null, queryId, - mock(TezTaskAttemptID.class)); + mock(TezTaskAttemptID.class), false); Thread.currentThread().sleep(2000); // verify both am get node heartbeat @@ -97,7 +97,8 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(umbilical).nodeHeartbeat(any(Text.class), any(Text.class), anyInt(), - any(LlapTaskUmbilicalProtocol.TezAttemptArray.class)); + any(LlapTaskUmbilicalProtocol.TezAttemptArray.class), + any(LlapTaskUmbilicalProtocol.BooleanArray.class)); return umbilical; } } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index a4bd4dfd88..5d4ce223d9 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -15,15 +15,15 @@ package org.apache.hadoop.hive.llap.tezplugins; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.io.Writable; - +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -41,6 +41,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.conf.HiveConf; @@ -66,6 +67,7 @@ import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; +import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; @@ -705,16 +707,21 @@ public void registerPingingNode(LlapNodeId nodeId) { private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0); - void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks) { + void nodePinged(String hostname, String uniqueId, int port, + TezAttemptArray tasks, BooleanArray guaranteed) { // TODO: do we ever need the port? we could just do away with nodeId altogether. LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); registerPingingNode(nodeId); BiMap biMap = entityTracker.getContainerAttemptMapForNode(nodeId); if (biMap != null) { - HashSet attempts = new HashSet<>(); - for (Writable w : tasks.get()) { - attempts.add((TezTaskAttemptID)w); + HashMap attempts = new HashMap<>(); + for (int i = 0; i < tasks.get().length; ++i) { + boolean isGuaranteed = false; + if (guaranteed != null) { + isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get(); + } + attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed); } String error = ""; synchronized (biMap) { @@ -729,8 +736,10 @@ void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray task // However, the next heartbeat(s) should get the value eventually and mark task as alive. // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET. if (taskNodeId != null && taskNodeId.equals(uniqueId)) { - if (attempts.contains(attemptId)) { - getContext().taskAlive(entry.getValue()); + Boolean isGuaranteed = attempts.get(attemptId); + if (isGuaranteed != null) { + getContext().taskAlive(attemptId); + scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue()); } else { error += (attemptId + ", "); } @@ -835,12 +844,12 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce } @Override - public void nodeHeartbeat( - Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException { + public void nodeHeartbeat(Text hostname, Text uniqueId, int port, + TezAttemptArray aw, BooleanArray guaranteed) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]"); } - nodePinged(hostname.toString(), uniqueId.toString(), port, aw); + nodePinged(hostname.toString(), uniqueId.toString(), port, aw, guaranteed); } @Override diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b805a..b612f6acf0 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -3002,5 +3002,44 @@ int getUnusedGuaranteedCount() { return unusedGuaranteed; } + /** + * A direct call from communicator to scheduler to propagate data that cannot be passed via Tez. + */ + public void taskInfoUpdated(TezTaskAttemptID attemptId, boolean isGuaranteed) { + TaskInfo ti = null; + writeLock.lock(); + try { + ti = tasksById.get(attemptId); + if (ti == null) { + WM_LOG.warn("Unknown task from heartbeat " + attemptId); + return; + } + } finally { + writeLock.unlock(); + } + // The update options for outside the lock - see below the synchronized block. + boolean newState = false; + synchronized (ti) { + if (ti.isPendingUpdate) return; // A pending update is not done. + if (ti.isGuaranteed == null) return; // The task has terminated, out of date heartbeat. + if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == isGuaranteed) { + return; // The heartbeat is consistent with what we have. + } + ti.lastSetGuaranteed = isGuaranteed; + if (isGuaranteed == ti.isGuaranteed) return; // Already consistent. Can happen w/null lSG. + + // There could be races here, e.g. heartbeat delivered us the old value just after we have + // received a successful confirmation from the API, so we are about to overwrite the latter. + // We could solve this by adding a version or smth like that; or by ignoring discrepancies + // unless we have previously received an update error for this task; however, the only effect + // right now are a few cheap redundant update calls; let's just do the simple thing. + newState = ti.isGuaranteed; + setUpdateStartedUnderTiLock(ti); + } // End of synchronized (ti) + WM_LOG.info("Sending an update based on inconsistent state from heartbeat for " + + attemptId + ", " + newState); + sendUpdateMessageAsync(ti, newState); + } + } diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 90b31e4fcd..082a912794 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -552,6 +552,36 @@ public void run() { } @Test(timeout = 10000) + public void testHeartbeatInconsistency() throws IOException, InterruptedException { + final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); + // Guaranteed flag is inconsistent based on heartbeat - another message should be send. + try { + Priority highPri = Priority.newInstance(1); + TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + tsWrapper.ts.updateGuaranteedCount(0); + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, null, highPri, new Object()); + tsWrapper.awaitTotalTaskAllocations(1); + TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1); + assertFalse(ti1.isGuaranteed()); + + // Heartbeat indicates task has a duck - this must be reverted. + tsWrapper.ts.taskInfoUpdated(task1, true); + tsWrapper.ts.waitForMessagesSent(1); + assertTrue(ti1.getLastSetGuaranteed()); + assertTrue(ti1.isUpdateInProgress()); + assertFalse(ti1.isGuaranteed()); + tsWrapper.ts.handleUpdateResult(ti1, true); + assertFalse(ti1.getLastSetGuaranteed()); + + tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED); + assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount()); + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();