diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java
new file mode 100644
index 0000000..14150a5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tracing;
+
+import com.google.common.base.Supplier;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.junit.Before;
+
+/**
+ * Span receiver that puts all spans into a single set.
+ * This is useful for testing.
+ * Please make sure to call clear() method before each test.
+ *
+ * We're not using HTrace's POJOReceiver here so as that doesn't
+ * push all the metrics to a static place, and would make testing
+ * SpanReceiverHost harder.
+ */
+public class SetSpanReceiver implements SpanReceiver {
+
+ public SetSpanReceiver(HTraceConfiguration conf) {
+ }
+
+ public void receiveSpan(Span span) {
+ SetHolder.spans.put(span.getSpanId(), span);
+ }
+
+ public void close() {
+ }
+
+ public static void clear() {
+ SetHolder.spans.clear();
+ }
+
+ public static class SetHolder {
+ public static ConcurrentHashMap spans =
+ new ConcurrentHashMap();
+
+ public static int size() {
+ return spans.size();
+ }
+
+ public static Map> getMap() {
+ Map> map = new HashMap>();
+
+ for (Span s : spans.values()) {
+ List l = map.get(s.getDescription());
+ if (l == null) {
+ l = new LinkedList();
+ map.put(s.getDescription(), l);
+ }
+ l.add(s);
+ }
+ return map;
+ }
+ }
+
+ public static void assertSpanNamesFound(final String[] expectedSpanNames) {
+ try {
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ Map> map = SetHolder.getMap();
+ for (String spanName : expectedSpanNames) {
+ if (!map.containsKey(spanName)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, 100, 1000);
+ } catch (TimeoutException e) {
+ Assert.fail("timed out to get expected spans: " + e.getMessage());
+ } catch (InterruptedException e) {
+ Assert.fail("interrupted while waiting spans: " + e.getMessage());
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
index 3720abe..9210910 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
@@ -24,11 +24,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.htrace.HTraceConfiguration;
+import org.apache.hadoop.tracing.SetSpanReceiver;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.junit.AfterClass;
@@ -39,14 +37,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.base.Supplier;
public class TestTracing {
@@ -96,7 +88,7 @@ public void testWriteTraceHooks() throws Exception {
"org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
"ClientNamenodeProtocol#addBlock"
};
- assertSpanNamesFound(expectedSpanNames);
+ SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
// The trace should last about the same amount of time as the test
Map> map = SetSpanReceiver.SetHolder.getMap();
@@ -181,7 +173,7 @@ public void testReadTraceHooks() throws Exception {
"ClientNamenodeProtocol#getBlockLocations",
"OpReadBlockProto"
};
- assertSpanNamesFound(expectedSpanNames);
+ SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
// The trace should last about the same amount of time as the test
Map> map = SetSpanReceiver.SetHolder.getMap();
@@ -232,8 +224,8 @@ public void testReadWithoutTraceHooks() throws Exception {
}
@Before
- public void cleanSet() {
- SetSpanReceiver.SetHolder.spans.clear();
+ public void clearSpans() {
+ SetSpanReceiver.clear();
}
@BeforeClass
@@ -256,69 +248,4 @@ public static void setupCluster() throws IOException {
public static void shutDown() throws IOException {
cluster.shutdown();
}
-
- static void assertSpanNamesFound(final String[] expectedSpanNames) {
- try {
- GenericTestUtils.waitFor(new Supplier() {
- @Override
- public Boolean get() {
- Map> map = SetSpanReceiver.SetHolder.getMap();
- for (String spanName : expectedSpanNames) {
- if (!map.containsKey(spanName)) {
- return false;
- }
- }
- return true;
- }
- }, 100, 1000);
- } catch (TimeoutException e) {
- Assert.fail("timed out to get expected spans: " + e.getMessage());
- } catch (InterruptedException e) {
- Assert.fail("interrupted while waiting spans: " + e.getMessage());
- }
- }
-
- /**
- * Span receiver that puts all spans into a single set.
- * This is useful for testing.
- *
- * We're not using HTrace's POJOReceiver here so as that doesn't
- * push all the metrics to a static place, and would make testing
- * SpanReceiverHost harder.
- */
- public static class SetSpanReceiver implements SpanReceiver {
-
- public SetSpanReceiver(HTraceConfiguration conf) {
- }
-
- public void receiveSpan(Span span) {
- SetHolder.spans.put(span.getSpanId(), span);
- }
-
- public void close() {
- }
-
- public static class SetHolder {
- public static ConcurrentHashMap spans =
- new ConcurrentHashMap();
-
- public static int size() {
- return spans.size();
- }
-
- public static Map> getMap() {
- Map> map = new HashMap>();
-
- for (Span s : spans.values()) {
- List l = map.get(s.getDescription());
- if (l == null) {
- l = new LinkedList();
- map.put(s.getDescription(), l);
- }
- l.add(s);
- }
- return map;
- }
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
index 2981558..019406a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
@@ -30,12 +30,14 @@
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.tracing.SetSpanReceiver;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@@ -60,12 +62,17 @@ public static void shutdown() throws IOException {
sockDir.close();
}
+ @Before
+ public void clearSpans() {
+ SetSpanReceiver.clear();
+ }
+
@Test
public void testShortCircuitTraceHooks() throws IOException {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
- TestTracing.SetSpanReceiver.class.getName());
+ SetSpanReceiver.class.getName());
conf.setLong("dfs.blocksize", 100 * 1024);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
@@ -92,7 +99,7 @@ public void testShortCircuitTraceHooks() throws IOException {
"OpRequestShortCircuitAccessProto",
"ShortCircuitShmRequestProto"
};
- TestTracing.assertSpanNamesFound(expectedSpanNames);
+ SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
} finally {
dfs.close();
cluster.shutdown();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 12714de..6aab2ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -40,6 +40,11 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.tracing.SpanReceiverInfo;
+import org.apache.hadoop.tracing.TraceAdminProtocol;
+import org.apache.hadoop.tracing.TraceAdminProtocolPB;
+import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
+import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
@@ -89,8 +94,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
-public class AdminService extends CompositeService implements
- HAServiceProtocol, ResourceManagerAdministrationProtocol {
+public class AdminService extends CompositeService
+ implements HAServiceProtocol, ResourceManagerAdministrationProtocol,
+ TraceAdminProtocol {
private static final Log LOG = LogFactory.getLog(AdminService.class);
@@ -187,6 +193,15 @@ protected void startServer() throws Exception {
HAServiceProtocol.class, haPbService);
}
+ TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
+ new TraceAdminProtocolServerSideTranslatorPB(this);
+ BlockingService traceAdminService = TraceAdminService
+ .newReflectiveBlockingService(traceAdminXlator);
+ RPC.setProtocolEngine(conf, TraceAdminProtocolPB.class,
+ ProtobufRpcEngine.class);
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ TraceAdminProtocolPB.class, traceAdminService);
+
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
@@ -704,4 +719,22 @@ private YarnException logAndWrapException(Exception exception, String user,
"AdminService", "Exception " + msg);
return RPCUtil.getRemoteException(exception);
}
+
+ @Override // TraceAdminProtocol
+ public SpanReceiverInfo[] listSpanReceivers() throws IOException {
+ checkAccess("listSpanReceivers");
+ return rm.spanReceiverHost.listSpanReceivers();
+ }
+
+ @Override // TraceAdminProtocol
+ public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
+ checkAccess("addSpanReceiver");
+ return rm.spanReceiverHost.addSpanReceiver(info);
+ }
+
+ @Override // TraceAdminProtocol
+ public void removeSpanReceiver(long id) throws IOException {
+ checkAccess("removeSpanReceiver");
+ rm.spanReceiverHost.removeSpanReceiver(id);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8bd8e21..54727a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
@@ -156,6 +157,7 @@
private WebApp webApp;
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
+ SpanReceiverHost spanReceiverHost;
@VisibleForTesting
protected String webAppAddress;
@@ -254,6 +256,8 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
+ this.spanReceiverHost = SpanReceiverHost.getInstance(this.conf);
+
super.serviceInit(this.conf);
}
@@ -1096,6 +1100,9 @@ protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
+ if (spanReceiverHost != null) {
+ spanReceiverHost.closeReceivers();
+ }
if (fetcher != null) {
fetcher.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java
new file mode 100644
index 0000000..72f6f55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tracing.SetSpanReceiver;
+import org.apache.hadoop.tracing.SpanReceiverHost;
+import org.apache.hadoop.tracing.TraceAdmin;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestYARNTracing {
+ private static MiniYARNCluster cluster;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
+ SetSpanReceiver.class.getName());
+ cluster = new MiniYARNCluster(TestYARNTracing.class.getSimpleName(),
+ 1, 1, 1);
+ cluster.init(conf);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (cluster != null) {
+ cluster.stop();
+ cluster = null;
+ }
+ }
+
+ @Before
+ public void clearSpans() {
+ SetSpanReceiver.clear();
+ }
+
+ @Test
+ public void testRMTracing() throws Exception {
+ TraceScope ts = null;
+ try {
+ Configuration conf = cluster.getConfig();
+ String hostPort = conf.get(YarnConfiguration.RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS);
+ TraceAdmin traceAdmin = new TraceAdmin();
+ traceAdmin.setConf(conf);
+ ts = Trace.startSpan("testRMTracing", Sampler.ALWAYS);
+ Assert.assertEquals(0,
+ runTraceCommand(traceAdmin, "-list", "-host", hostPort));
+ ts.close();
+ String[] expectedSpanNames = {
+ "testRMTracing",
+ "TraceAdminService#listSpanReceivers",
+ "org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService.listSpanReceivers"
+ };
+ SetSpanReceiver.assertSpanNamesFound(expectedSpanNames);
+ } finally {
+ ts.close();
+ }
+ }
+
+ private static int runTraceCommand(TraceAdmin trace, String... cmd)
+ throws Exception {
+ return trace.run(cmd);
+ }
+}