diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 716fb23..fc66254 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -253,8 +253,9 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t long requestId = requestCounter.incrementAndGet(); int fromEventId = task.getNextFromEventId(); - TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr, - task.getTaskAttemptID(), fromEventId, maxEventsToGet); + int fromPreRoutedEventId = task.getNextPreRoutedEventId(); + TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, + containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat to AM, request=" + request); } @@ -286,6 +287,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t } } else { task.setNextFromEventId(response.getNextFromEventId()); + task.setNextPreRoutedEventId(response.getNextPreRoutedEventId()); if (response.getEvents() != null && !response.getEvents().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="