diff --git llap-client/pom.xml llap-client/pom.xml
index 8a1a8bd..87f53b7 100644
--- llap-client/pom.xml
+++ llap-client/pom.xml
@@ -19,7 +19,7 @@
org.apache.hive
hive
- 1.2.0-SNAPSHOT
+ 1.3.0-SNAPSHOT
../pom.xml
diff --git llap-server/pom.xml llap-server/pom.xml
index 9325bd9..4fcd705 100644
--- llap-server/pom.xml
+++ llap-server/pom.xml
@@ -19,7 +19,7 @@
org.apache.hive
hive
- 1.2.0-SNAPSHOT
+ 1.3.0-SNAPSHOT
../pom.xml
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index f3771ea..716fb23 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -27,6 +27,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -130,7 +131,7 @@ public void shutdown() {
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private final RuntimeTask task;
- private EventMetaData updateEventMetadata;
+ private final EventMetaData updateEventMetadata;
private final LlapTaskUmbilicalProtocol umbilical;
@@ -141,6 +142,9 @@ public void shutdown() {
private final AtomicLong requestCounter;
+ private final AtomicBoolean finalEventQueued = new AtomicBoolean(false);
+ private final AtomicBoolean askedToDie = new AtomicBoolean(false);
+
private LinkedBlockingQueue eventsToSend = new LinkedBlockingQueue();
private final ReentrantLock lock = new ReentrantLock();
@@ -204,6 +208,9 @@ public Boolean call() throws Exception {
}
int pendingEventCount = eventsToSend.size();
if (pendingEventCount > 0) {
+ // This is OK because the pending events will be sent via the succeeded/failed messages.
+ // TaskDone is set before taskSucceeded/taskFailed are sent out - which is what causes the
+ // thread to exit
LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
}
return true;
@@ -245,8 +252,9 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t
}
long requestId = requestCounter.incrementAndGet();
+ int fromEventId = task.getNextFromEventId();
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
- task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+ task.getTaskAttemptID(), fromEventId, maxEventsToGet);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
@@ -260,6 +268,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t
if (response.shouldDie()) {
LOG.info("Received should die response from AM");
+ askedToDie.set(true);
return new ResponseWrapper(true, 1);
}
if (response.getLastRequestId() != requestId) {
@@ -276,10 +285,13 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
+ task.setNextFromEventId(response.getNextFromEventId());
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
- + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
+ + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()
+ + " fromEventId=" + fromEventId
+ + " nextFromEventId=" + response.getNextFromEventId());
}
// This should ideally happen in a separate thread
numEventsReceived = response.getEvents().size();
@@ -318,10 +330,16 @@ private void maybeLogCounters() {
* indicates an exception somewhere in the AM.
*/
private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
- TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
- updateEventMetadata);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+ // Ensure only one final event is ever sent.
+ if (!finalEventQueued.getAndSet(true)) {
+ TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+ TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+ updateEventMetadata);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+ } else {
+ LOG.warn("A final task state event has already been sent. Not sending again");
+ return askedToDie.get();
+ }
}
private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
@@ -353,15 +371,22 @@ private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
*/
private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
- if (diagnostics == null) {
- diagnostics = ExceptionUtils.getStackTrace(t);
+ // Ensure only one final event is ever sent.
+ if (!finalEventQueued.getAndSet(true)) {
+ TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+ if (diagnostics == null) {
+ diagnostics = ExceptionUtils.getStackTrace(t);
+ } else {
+ diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+ }
+ TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ srcMeta == null ? updateEventMetadata : srcMeta);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
} else {
- diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+ LOG.warn("A final task state event has already been sent. Not sending again");
+ return askedToDie.get();
}
- TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
- srcMeta == null ? updateEventMetadata : srcMeta);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+
}
private void addEvents(TezTaskAttemptID taskAttemptID, Collection events) {
diff --git pom.xml pom.xml
index d4eb8e5..60a18c3 100644
--- pom.xml
+++ pom.xml
@@ -156,7 +156,7 @@
1.0.1
1.7.5
4.0.4
- 0.7.0-TEZ-2003-SNAPSHOT
+ 0.8.0-TEZ-2003-SNAPSHOT
2.2.0
1.3.0
2.10