diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYARNTracing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYARNTracing.java index 7bfa1f5..01536e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYARNTracing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYARNTracing.java @@ -18,15 +18,43 @@ package org.apache.hadoop.yarn.client; +import com.google.common.base.Supplier; import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tracing.SetSpanReceiver; import org.apache.hadoop.tracing.SpanReceiverHost; import org.apache.hadoop.tracing.TraceAdmin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; import org.apache.htrace.Sampler; +import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.junit.AfterClass; @@ -36,7 +64,9 @@ import org.junit.Test; public class TestYARNTracing { + static final Log LOG = LogFactory.getLog(TestYARNTracing.class); private static MiniYARNCluster cluster; + private static YarnClient yarnClient; @BeforeClass public static void setup() throws IOException { @@ -48,10 +78,17 @@ public static void setup() throws IOException { 1, 1, 1); cluster.init(conf); cluster.start(); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(cluster.getConfig()); + yarnClient.start(); } @AfterClass public static void teardown() { + if (yarnClient != null) { + yarnClient.stop(); + yarnClient = null; + } if (cluster != null) { cluster.stop(); cluster = null; @@ -65,15 +102,9 @@ public void clearSpans() { @Test public void testRMTracing() throws Exception { - YarnClient client = YarnClient.createYarnClient(); - client.init(cluster.getConfig()); - client.start(); - try (TraceScope ts = Trace.startSpan("testRMTracing", Sampler.ALWAYS)) { - client.getApplications(); + yarnClient.getApplications(); } - - client.stop(); String[] expectedSpanNames = { "testRMTracing", "ApplicationClientProtocolPB#getApplications", @@ -114,4 +145,104 @@ private static int runTraceCommand(TraceAdmin trace, String... cmd) throws Exception { return trace.run(cmd); } + + @Test + public void testNMTracing() throws Exception { + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + appContext.setApplicationName("TestNMTracing"); + appContext.setPriority(Priority.newInstance(0)); + appContext.setQueue("default"); + appContext.setAMContainerSpec( + Records.newRecord(ContainerLaunchContext.class)); + appContext.setUnmanagedAM(true); + yarnClient.submitApplication(appContext); + + // wait for app to start + final YarnClient yarnClient = TestYARNTracing.yarnClient; + final ApplicationId appId = appContext.getApplicationId(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + ApplicationReport report = yarnClient.getApplicationReport(appId); + if (report.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + RMAppAttempt appAttempt = + cluster.getResourceManager() + .getRMContext().getRMApps().get(appId).getCurrentAppAttempt(); + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + return true; + } + } + } catch (Exception e) { + LOG.info(StringUtils.stringifyException(e)); + Assert.fail("Exception while getting application state."); + } + return false; + } + }, 1000, 30000); + + List nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + String node = nodeReports.get(0).getNodeId().getHost(); + String rack = nodeReports.get(0).getRackName(); + String[] nodes = new String[] {node}; + String[] racks = new String[] {rack}; + Resource capability = Resource.newInstance(1024, 0); + Priority priority = Priority.newInstance(0); + + RMAppAttempt appAttempt = + cluster.getResourceManager() + .getRMContext().getRMApps().get(appId).getCurrentAppAttempt(); + UserGroupInformation.setLoginUser( + UserGroupInformation.createRemoteUser( + UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + + NMTokenCache nmTokenCache = new NMTokenCache(); + AMRMClient rmClient = AMRMClient.createAMRMClient(); + rmClient.setNMTokenCache(nmTokenCache); + rmClient.init(cluster.getConfig()); + rmClient.start(); + Container container = null; + try { + rmClient.registerApplicationMaster("TestNMTracing", 10000, ""); + rmClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + for (int i = 5; i > 0; i--) { + AllocateResponse allocResponse = rmClient.allocate(0.1f); + if (allocResponse.getAllocatedContainers().size() > 0) { + container = allocResponse.getAllocatedContainers().get(0); + break; + } + Thread.sleep(1000); + } + } finally { + rmClient.stop(); + } + if (container == null) { + Assert.fail("Failed to allocate containers."); + } + + NMClient nmClient = NMClient.createNMClient(); + nmClient.setNMTokenCache(nmTokenCache); + nmClient.init(cluster.getConfig()); + nmClient.start(); + try (TraceScope ts = Trace.startSpan("testNMTracing", Sampler.ALWAYS)) { + nmClient.getContainerStatus(container.getId(), container.getNodeId()); + // getContainerStatus called before startContainer is expected + // to throw exception. It is not problem here because this test + // just checks whether server side tracing span is get or not. + Assert.fail("Exception is expected"); + } catch (YarnException e) { + LOG.info(StringUtils.stringifyException(e)); + } finally { + nmClient.close(); + } + String[] expectedSpanNames = { + "testNMTracing", + "ContainerManagementProtocolPB#getContainerStatuses", + "ContainerManagementProtocolService#getContainerStatuses" + }; + SetSpanReceiver.assertSpanNamesFound(expectedSpanNames); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a06293d..91f99b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.tracing.SpanReceiverHost; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.JvmPauseMonitor; @@ -95,6 +96,7 @@ private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private NMStateStoreService nmStore = null; + private SpanReceiverHost spanReceiverHost; private AtomicBoolean isStopping = new AtomicBoolean(false); private boolean rmWorkPreservingRestartEnabled; @@ -313,6 +315,9 @@ protected void serviceInit(Configuration conf) throws Exception { pauseMonitor = new JvmPauseMonitor(conf); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); + this.spanReceiverHost = + SpanReceiverHost.get(conf, YarnConfiguration.YARN_SERVER_HTRACE_PREFIX); + DefaultMetricsSystem.initialize("NodeManager"); // StatusUpdater should be added last so that it get started last @@ -345,6 +350,9 @@ protected void serviceStop() throws Exception { if (pauseMonitor != null) { pauseMonitor.stop(); } + if (spanReceiverHost != null) { + spanReceiverHost.closeReceivers(); + } } finally { // YARN-3641: NM's services stop get failed shouldn't block the // release of NMLevelDBStore.