From d371d2bd9cf305e5b4d4f22bea0ddb6e51c2db0e Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 18 Apr 2016 19:16:46 -0500 Subject: [PATCH] HTRACE-358. Provide convenience wrapper around ScheduledExecutorService --- .../htrace/core/ScheduledTraceExecutorService.java | 52 ++++++++++++++++++++ .../apache/htrace/core/TraceExecutorService.java | 18 +++++-- .../main/java/org/apache/htrace/core/Tracer.java | 10 ++++ .../org/apache/htrace/core/TestTraceExecutor.java | 57 ++++++++++++++++++++++ 4 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java new file mode 100644 index 0000000..1bc46a1 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class ScheduledTraceExecutorService extends TraceExecutorService implements ScheduledExecutorService { + final ScheduledExecutorService impl; + + ScheduledTraceExecutorService(Tracer tracer, String scopeName, ScheduledExecutorService impl) { + super(tracer, scopeName, impl); + this.impl = impl; + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return impl.schedule(wrap(command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return impl.schedule(wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return impl.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return impl.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit); + } + +} diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java index 81e31ea..15015d4 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java @@ -40,7 +40,7 @@ public class TraceExecutorService implements ExecutorService { @Override public void execute(Runnable command) { - impl.execute(tracer.wrap(command, scopeName)); + impl.execute(wrap(command)); } @Override @@ -71,24 +71,32 @@ public class TraceExecutorService implements ExecutorService { @Override public Future submit(Callable task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); } @Override public Future submit(Runnable task, T result) { - return impl.submit(tracer.wrap(task, scopeName), result); + return impl.submit(wrap(task), result); } @Override public Future submit(Runnable task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); + } + + protected Runnable wrap(Runnable runnable) { + return tracer.wrap(runnable, scopeName); + } + + protected Callable wrap(Callable callable) { + return tracer.wrap(callable, scopeName); } private Collection> wrapCollection( Collection> tasks) { List> result = new ArrayList>(); for (Callable task : tasks) { - result.add(tracer.wrap(task, scopeName)); + result.add(wrap(task)); } return result; } diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java index a04c9b9..83a8a3e 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -474,6 +475,15 @@ public class Tracer implements Closeable { return new TraceExecutorService(this, scopeName, impl); } + public ScheduledTraceExecutorService newTraceExecutorService(ScheduledExecutorService impl) { + return newTraceExecutorService(impl, null); + } + + public ScheduledTraceExecutorService newTraceExecutorService(ScheduledExecutorService impl, + String scopeName) { + return new ScheduledTraceExecutorService(this, scopeName, impl); + } + public TracerPool getTracerPool() { if (tracerPool == null) { throwClientError(toString() + " is closed."); diff --git a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java index bf98a1a..426d9fe 100644 --- a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java +++ b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java @@ -17,10 +17,17 @@ package org.apache.htrace.core; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -66,6 +73,56 @@ public class TestTraceExecutor { } } + @Test + public void testScheduledExecutor() throws Exception { + final int taskCount = 3; + final int delay = 500; + + HTraceConfiguration conf = HTraceConfiguration.fromKeyValuePairs("sampler.classes", "AlwaysSampler"); + + ScheduledExecutorService ses = null; + try (Tracer tracer = new Tracer.Builder("TestTraceExecutor").conf(conf).build()) { + ses = Executors.newScheduledThreadPool(taskCount, new NamingThreadFactory()); + ses = tracer.newTraceExecutorService(ses); + + final CountDownLatch startLatch = new CountDownLatch(taskCount); + final CountDownLatch continueLatch = new CountDownLatch(1); + Callable task = new Callable() { + @Override + public String call() throws InterruptedException { + startLatch.countDown(); + // Prevent any task from exiting until every task has started + assertTrue(continueLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + return Tracer.getCurrentSpan().getDescription(); + } + }; + + try (TraceScope scope = tracer.newScope("TestRunnable")) { + Collection> futures = new ArrayList<>(); + + for (int i = 0; i < taskCount; i++) { + futures.add(ses.schedule(task, delay, TimeUnit.MILLISECONDS)); + } + + // Wait for all tasks to start + assertTrue(startLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + continueLatch.countDown(); + // Collect the expected results + Collection results = new HashSet<>(); + for (Future future : futures) { + results.add(future.get(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + } + + assertTrue("Timeline Annotations should have gone to child traces.", Tracer.getCurrentSpan().getTimelineAnnotations().isEmpty()); + assertEquals("Duplicated child span descriptions.", taskCount, results.size()); + } + } finally { + if (ses != null) { + ses.shutdown(); + } + } + } + /* * Inspired by org.apache.solr.util.DefaultSolrThreadFactory */ -- 1.9.1