From e3d2e01d820c887d0c68abefd5198ddb9a1456c6 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 18 Apr 2016 19:16:46 -0500 Subject: [PATCH] HTRACE-358. Provide convenience wrapper around ScheduledExecutorService --- .../htrace/core/ScheduledTraceExecutorService.java | 57 +++++++++++++++++++++ .../apache/htrace/core/TraceExecutorService.java | 27 ++++++++-- .../main/java/org/apache/htrace/core/Tracer.java | 10 ++++ .../org/apache/htrace/core/TestTraceExecutor.java | 59 ++++++++++++++++++++++ 4 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java new file mode 100644 index 0000000..bade0fd --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A convenience wrapper around a {@link ScheduledExecutorService} for automatically propagating trace scopes to executable tasks. + *

+ * Recurring tasks will use independent scopes per execution, but will all be tied to the same parent scope (if any). + */ +public class ScheduledTraceExecutorService extends TraceExecutorService implements ScheduledExecutorService { + final ScheduledExecutorService impl; + + ScheduledTraceExecutorService(Tracer tracer, String scopeName, ScheduledExecutorService impl) { + super(tracer, scopeName, impl); + this.impl = impl; + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return impl.schedule(wrap(command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return impl.schedule(wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return impl.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return impl.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit); + } + +} diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java index 81e31ea..bfe4803 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java @@ -26,6 +26,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +/** + * A convenience wrapper around an {@link ExecutorService} for automatically propagating trace scopes to executable tasks. + */ public class TraceExecutorService implements ExecutorService { private final Tracer tracer; private final String scopeName; @@ -40,7 +43,7 @@ public class TraceExecutorService implements ExecutorService { @Override public void execute(Runnable command) { - impl.execute(tracer.wrap(command, scopeName)); + impl.execute(wrap(command)); } @Override @@ -71,24 +74,38 @@ public class TraceExecutorService implements ExecutorService { @Override public Future submit(Callable task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); } @Override public Future submit(Runnable task, T result) { - return impl.submit(tracer.wrap(task, scopeName), result); + return impl.submit(wrap(task), result); } @Override public Future submit(Runnable task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); + } + + /* + * Intended for internal use only. + */ + Runnable wrap(Runnable runnable) { + return tracer.wrap(runnable, scopeName); + } + + /* + * Intended for internal use only. + */ + Callable wrap(Callable callable) { + return tracer.wrap(callable, scopeName); } private Collection> wrapCollection( Collection> tasks) { List> result = new ArrayList>(); for (Callable task : tasks) { - result.add(tracer.wrap(task, scopeName)); + result.add(wrap(task)); } return result; } diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java index f78e0a0..4b712fb 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -474,6 +475,15 @@ public class Tracer implements Closeable { return new TraceExecutorService(this, scopeName, impl); } + public ScheduledTraceExecutorService newTraceExecutorService(ScheduledExecutorService impl) { + return newTraceExecutorService(impl, null); + } + + public ScheduledTraceExecutorService newTraceExecutorService(ScheduledExecutorService impl, + String scopeName) { + return new ScheduledTraceExecutorService(this, scopeName, impl); + } + public TracerPool getTracerPool() { if (tracerPool == null) { throwClientError(toString() + " is closed."); diff --git a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java index dbdd27c..e09bfa4 100644 --- a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java +++ b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java @@ -17,10 +17,17 @@ package org.apache.htrace.core; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -88,6 +95,58 @@ public class TestTraceExecutor { } } + @Test + public void testScheduledExecutor() throws Exception { + final int taskCount = 3; + final int delay = 500; + + HTraceConfiguration conf = HTraceConfiguration.fromKeyValuePairs("sampler.classes", "AlwaysSampler"); + + ScheduledExecutorService ses = null; + try (Tracer tracer = new Tracer.Builder("TestTraceExecutor").conf(conf).build()) { + ses = Executors.newScheduledThreadPool(taskCount, new NamingThreadFactory()); + ses = tracer.newTraceExecutorService(ses); + + final CountDownLatch startLatch = new CountDownLatch(taskCount); + final CountDownLatch continueLatch = new CountDownLatch(1); + Callable task = new Callable() { + @Override + public String call() throws InterruptedException { + startLatch.countDown(); + // Prevent any task from exiting until every task has started + assertTrue(continueLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + // Annotate on the presumed child trace + Tracer.getCurrentSpan().addTimelineAnnotation(Thread.currentThread().getName()); + return Tracer.getCurrentSpan().getDescription(); + } + }; + + try (TraceScope scope = tracer.newScope("TestRunnable")) { + Collection> futures = new ArrayList<>(); + + for (int i = 0; i < taskCount; i++) { + futures.add(ses.schedule(task, delay, TimeUnit.MILLISECONDS)); + } + + // Wait for all tasks to start + assertTrue(startLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + continueLatch.countDown(); + // Collect the expected results + Collection results = new HashSet<>(); + for (Future future : futures) { + results.add(future.get(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + } + + assertTrue("Timeline Annotations should have gone to child traces.", Tracer.getCurrentSpan().getTimelineAnnotations().isEmpty()); + assertEquals("Duplicated child span descriptions.", taskCount, results.size()); + } + } finally { + if (ses != null) { + ses.shutdown(); + } + } + } + /* * Inspired by org.apache.solr.util.DefaultSolrThreadFactory */ -- 1.9.1