diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java new file mode 100644 index 0000000..14150a5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/tracing/SetSpanReceiver.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import com.google.common.base.Supplier; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import org.junit.Assert; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.junit.Before; + +/** + * Span receiver that puts all spans into a single set. + * This is useful for testing. + * Please make sure to call clear() method before each test. + *

+ * We're not using HTrace's POJOReceiver here so as that doesn't + * push all the metrics to a static place, and would make testing + * SpanReceiverHost harder. + */ +public class SetSpanReceiver implements SpanReceiver { + + public SetSpanReceiver(HTraceConfiguration conf) { + } + + public void receiveSpan(Span span) { + SetHolder.spans.put(span.getSpanId(), span); + } + + public void close() { + } + + public static void clear() { + SetHolder.spans.clear(); + } + + public static class SetHolder { + public static ConcurrentHashMap spans = + new ConcurrentHashMap(); + + public static int size() { + return spans.size(); + } + + public static Map> getMap() { + Map> map = new HashMap>(); + + for (Span s : spans.values()) { + List l = map.get(s.getDescription()); + if (l == null) { + l = new LinkedList(); + map.put(s.getDescription(), l); + } + l.add(s); + } + return map; + } + } + + public static void assertSpanNamesFound(final String[] expectedSpanNames) { + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + Map> map = SetHolder.getMap(); + for (String spanName : expectedSpanNames) { + if (!map.containsKey(spanName)) { + return false; + } + } + return true; + } + }, 100, 1000); + } catch (TimeoutException e) { + Assert.fail("timed out to get expected spans: " + e.getMessage()); + } catch (InterruptedException e) { + Assert.fail("interrupted while waiting spans: " + e.getMessage()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java index 3720abe..9210910 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java @@ -24,11 +24,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.htrace.HTraceConfiguration; +import org.apache.hadoop.tracing.SetSpanReceiver; import org.apache.htrace.Sampler; import org.apache.htrace.Span; -import org.apache.htrace.SpanReceiver; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.junit.AfterClass; @@ -39,14 +37,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; - -import com.google.common.base.Supplier; public class TestTracing { @@ -96,7 +88,7 @@ public void testWriteTraceHooks() throws Exception { "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock", "ClientNamenodeProtocol#addBlock" }; - assertSpanNamesFound(expectedSpanNames); + SetSpanReceiver.assertSpanNamesFound(expectedSpanNames); // The trace should last about the same amount of time as the test Map> map = SetSpanReceiver.SetHolder.getMap(); @@ -181,7 +173,7 @@ public void testReadTraceHooks() throws Exception { "ClientNamenodeProtocol#getBlockLocations", "OpReadBlockProto" }; - assertSpanNamesFound(expectedSpanNames); + SetSpanReceiver.assertSpanNamesFound(expectedSpanNames); // The trace should last about the same amount of time as the test Map> map = SetSpanReceiver.SetHolder.getMap(); @@ -232,8 +224,8 @@ public void testReadWithoutTraceHooks() throws Exception { } @Before - public void cleanSet() { - SetSpanReceiver.SetHolder.spans.clear(); + public void clearSpans() { + SetSpanReceiver.clear(); } @BeforeClass @@ -256,69 +248,4 @@ public static void setupCluster() throws IOException { public static void shutDown() throws IOException { cluster.shutdown(); } - - static void assertSpanNamesFound(final String[] expectedSpanNames) { - try { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - Map> map = SetSpanReceiver.SetHolder.getMap(); - for (String spanName : expectedSpanNames) { - if (!map.containsKey(spanName)) { - return false; - } - } - return true; - } - }, 100, 1000); - } catch (TimeoutException e) { - Assert.fail("timed out to get expected spans: " + e.getMessage()); - } catch (InterruptedException e) { - Assert.fail("interrupted while waiting spans: " + e.getMessage()); - } - } - - /** - * Span receiver that puts all spans into a single set. - * This is useful for testing. - *

- * We're not using HTrace's POJOReceiver here so as that doesn't - * push all the metrics to a static place, and would make testing - * SpanReceiverHost harder. - */ - public static class SetSpanReceiver implements SpanReceiver { - - public SetSpanReceiver(HTraceConfiguration conf) { - } - - public void receiveSpan(Span span) { - SetHolder.spans.put(span.getSpanId(), span); - } - - public void close() { - } - - public static class SetHolder { - public static ConcurrentHashMap spans = - new ConcurrentHashMap(); - - public static int size() { - return spans.size(); - } - - public static Map> getMap() { - Map> map = new HashMap>(); - - for (Span s : spans.values()) { - List l = map.get(s.getDescription()); - if (l == null) { - l = new LinkedList(); - map.put(s.getDescription(), l); - } - l.add(s); - } - return map; - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java index 2981558..019406a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java @@ -30,12 +30,14 @@ import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.tracing.SetSpanReceiver; import org.apache.htrace.Sampler; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -60,12 +62,17 @@ public static void shutdown() throws IOException { sockDir.close(); } + @Before + public void clearSpans() { + SetSpanReceiver.clear(); + } + @Test public void testShortCircuitTraceHooks() throws IOException { assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS); conf = new Configuration(); conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, - TestTracing.SetSpanReceiver.class.getName()); + SetSpanReceiver.class.getName()); conf.setLong("dfs.blocksize", 100 * 1024); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); @@ -92,7 +99,7 @@ public void testShortCircuitTraceHooks() throws IOException { "OpRequestShortCircuitAccessProto", "ShortCircuitShmRequestProto" }; - TestTracing.assertSpanNamesFound(expectedSpanNames); + SetSpanReceiver.assertSpanNamesFound(expectedSpanNames); } finally { dfs.close(); cluster.shutdown(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 12714de..6aab2ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -40,6 +40,11 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.tracing.SpanReceiverInfo; +import org.apache.hadoop.tracing.TraceAdminProtocol; +import org.apache.hadoop.tracing.TraceAdminProtocolPB; +import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB; +import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; @@ -89,8 +94,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; -public class AdminService extends CompositeService implements - HAServiceProtocol, ResourceManagerAdministrationProtocol { +public class AdminService extends CompositeService + implements HAServiceProtocol, ResourceManagerAdministrationProtocol, + TraceAdminProtocol { private static final Log LOG = LogFactory.getLog(AdminService.class); @@ -187,6 +193,15 @@ protected void startServer() throws Exception { HAServiceProtocol.class, haPbService); } + TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = + new TraceAdminProtocolServerSideTranslatorPB(this); + BlockingService traceAdminService = TraceAdminService + .newReflectiveBlockingService(traceAdminXlator); + RPC.setProtocolEngine(conf, TraceAdminProtocolPB.class, + ProtobufRpcEngine.class); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + TraceAdminProtocolPB.class, traceAdminService); + this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_ADMIN_ADDRESS, @@ -704,4 +719,22 @@ private YarnException logAndWrapException(Exception exception, String user, "AdminService", "Exception " + msg); return RPCUtil.getRemoteException(exception); } + + @Override // TraceAdminProtocol + public SpanReceiverInfo[] listSpanReceivers() throws IOException { + checkAccess("listSpanReceivers"); + return rm.spanReceiverHost.listSpanReceivers(); + } + + @Override // TraceAdminProtocol + public long addSpanReceiver(SpanReceiverInfo info) throws IOException { + checkAccess("addSpanReceiver"); + return rm.spanReceiverHost.addSpanReceiver(info); + } + + @Override // TraceAdminProtocol + public void removeSpanReceiver(long id) throws IOException { + checkAccess("removeSpanReceiver"); + rm.spanReceiverHost.removeSpanReceiver(id); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8bd8e21..54727a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; +import org.apache.hadoop.tracing.SpanReceiverHost; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ReflectionUtils; @@ -156,6 +157,7 @@ private WebApp webApp; private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; + SpanReceiverHost spanReceiverHost; @VisibleForTesting protected String webAppAddress; @@ -254,6 +256,8 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); + this.spanReceiverHost = SpanReceiverHost.getInstance(this.conf); + super.serviceInit(this.conf); } @@ -1096,6 +1100,9 @@ protected void serviceStop() throws Exception { if (webApp != null) { webApp.stop(); } + if (spanReceiverHost != null) { + spanReceiverHost.closeReceivers(); + } if (fetcher != null) { fetcher.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java new file mode 100644 index 0000000..72f6f55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestYARNTracing.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tracing.SetSpanReceiver; +import org.apache.hadoop.tracing.SpanReceiverHost; +import org.apache.hadoop.tracing.TraceAdmin; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestYARNTracing { + private static MiniYARNCluster cluster; + + @BeforeClass + public static void setup() throws IOException { + Configuration conf = new YarnConfiguration(); + conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, + SetSpanReceiver.class.getName()); + cluster = new MiniYARNCluster(TestYARNTracing.class.getSimpleName(), + 1, 1, 1); + cluster.init(conf); + cluster.start(); + } + + @AfterClass + public static void teardown() { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + } + + @Before + public void clearSpans() { + SetSpanReceiver.clear(); + } + + @Test + public void testRMTracing() throws Exception { + TraceScope ts = null; + try { + Configuration conf = cluster.getConfig(); + String hostPort = conf.get(YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS); + TraceAdmin traceAdmin = new TraceAdmin(); + traceAdmin.setConf(conf); + ts = Trace.startSpan("testRMTracing", Sampler.ALWAYS); + Assert.assertEquals(0, + runTraceCommand(traceAdmin, "-list", "-host", hostPort)); + ts.close(); + String[] expectedSpanNames = { + "testRMTracing", + "TraceAdminService#listSpanReceivers", + "org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService.listSpanReceivers" + }; + SetSpanReceiver.assertSpanNamesFound(expectedSpanNames); + } finally { + ts.close(); + } + } + + private static int runTraceCommand(TraceAdmin trace, String... cmd) + throws Exception { + return trace.run(cmd); + } +}