diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 7376792..1ba18fc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -77,11 +78,12 @@ private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class); - private final LlapNodeId nodeId; + private volatile LlapNodeId nodeId; private final Configuration conf; private final ListeningExecutorService queueLookupExecutor; private final ListeningExecutorService executor; private final DelayQueue pendingHeartbeatQueeu = new DelayQueue(); + private final AtomicReference localAddress; private final long heartbeatInterval; private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Tracks appMasters to which heartbeats are being sent. This should not be used for any other @@ -89,9 +91,9 @@ private final Map knownAppMasters = new HashMap<>(); volatile ListenableFuture queueLookupFuture; - public AMReporter(LlapNodeId nodeId, Configuration conf) { + public AMReporter(AtomicReference localAddress, Configuration conf) { super(AMReporter.class.getName()); - this.nodeId = nodeId; + this.localAddress = localAddress; this.conf = conf; ExecutorService rawExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); @@ -102,7 +104,7 @@ public AMReporter(LlapNodeId nodeId, Configuration conf) { this.heartbeatInterval = conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); - LOG.info("AMReporter running with NodeId: {}", nodeId); + } @Override @@ -125,7 +127,8 @@ public void onFailure(Throwable t) { } } }); - LOG.info("Started service: " + getName()); + nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); + LOG.info("AMReporter running with NodeId: {}", nodeId); } @Override @@ -170,7 +173,7 @@ public void unregisterTask(String amLocation, int port) { synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - LOG.info(("Ignoring duplocate unregisterRequest for am at: " + amLocation + ":" + port)); + LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port)); } amNodeInfo.decrementAndGetTaskCount(); // Not removing this here. Will be removed when taken off the queue and discovered to have 0 @@ -184,6 +187,9 @@ public void taskKilled(String amLocation, int port, String user, Token future = executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo)); Futures.addCallback(future, new FutureCallback() { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6f9f429..10e192e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -67,7 +66,7 @@ private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class); public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; - private volatile AMReporter amReporter; + private final AMReporter amReporter; private final QueryTracker queryTracker; private final Scheduler executorService; private final AtomicReference localAddress; @@ -81,12 +80,14 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, int localShufflePort, AtomicReference localAddress, - long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) { + long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, + AMReporter amReporter) { super("ContainerRunnerImpl"); this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); this.localAddress = localAddress; + this.amReporter = amReporter; this.queryTracker = new QueryTracker(conf, localDirsBase); addIfService(queryTracker); @@ -122,22 +123,13 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { - // The node id will only be available at this point, since the server has been started in LlapDaemon super.serviceStart(); - LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), - localAddress.get().getPort()); - this.amReporter = new AMReporter(llapNodeId, conf); - amReporter.init(conf); - amReporter.start(); + } @Override protected void serviceStop() throws Exception { super.serviceStop(); - if (amReporter != null) { - amReporter.stop(); - amReporter = null; - } } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 75377d4..1801212 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -63,6 +63,7 @@ private final Configuration shuffleHandlerConf; private final LlapDaemonProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; + private final AMReporter amReporter; private final LlapRegistryService registry; private final LlapWebServices webServices; private final AtomicLong numSubmissions = new AtomicLong(0); @@ -155,8 +156,12 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); + + this.amReporter = new AMReporter(address, daemonConf); + + this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); - addIfService(server); + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, @@ -166,13 +171,19 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor shufflePort, address, executorMemoryBytes, - metrics); + metrics, + amReporter); addIfService(containerRunner); this.registry = new LlapRegistryService(true); addIfService(registry); this.webServices = new LlapWebServices(); addIfService(webServices); + // Bring up the server only after all other components have started. + addIfService(server); + // AMReporter after the server so that it gets the correct address. It knows how to deal with + // requests before it is started. + addIfService(amReporter); } private long getTotalHeapSize() { @@ -220,14 +231,16 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { - super.serviceStart(); + // Start the Shuffle service before the listener - until it's a service as well. ShuffleHandler.initializeAndStart(shuffleHandlerConf); + super.serviceStart(); + LOG.info("LlapDaemon serviceStart complete"); } public void serviceStop() throws Exception { super.serviceStop(); - shutdown(); ShuffleHandler.shutdown(); + shutdown(); LOG.info("LlapDaemon shutdown complete"); }