From e2b9cb8c1a414d75fe6cbd629a870d2ab3418247 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Wed, 14 Jan 2015 11:04:25 -0800 Subject: [PATCH] HBASE-6778: Deprecate Chore; its a thread per task when we should have one thread to do all tasks --- .../java/org/apache/hadoop/hbase/ChoreService.java | 273 ++++++++ .../org/apache/hadoop/hbase/ScheduledChore.java | 192 ++++++ .../org/apache/hadoop/hbase/TestChoreService.java | 739 +++++++++++++++++++++ 3 files changed, 1204 insertions(+) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java new file mode 100644 index 0000000..f4e626a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -0,0 +1,273 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.LinkedHashMap; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class ChoreService implements ChoreServicer { + /** The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor */ + public final static int MIN_CORE_POOL_SIZE = 2; + + /** By default, scheduled chores have an initial delay of 0 */ + public final static int INITIAL_DELAY = 0; + + private final ScheduledThreadPoolExecutor scheduler; + + /** + * Maps chores to their futures. Futures are used to control a chore's schedule + */ + private final ConcurrentHashMap> scheduledChores; + + /** + * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the + * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to + * increase the core pool size by 1 (otherwise a single long running chore whose execution is + * longer than its period would be able to spawn too many threads). + */ + private final ConcurrentHashMap choresMissingStartTime; + + + public ChoreService() { + this(MIN_CORE_POOL_SIZE); + } + + public ChoreService(int corePoolSize) { + if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, new ChoreServiceThreadFactory()); + scheduler.setRemoveOnCancelPolicy(true); + scheduledChores = new ConcurrentHashMap>(); + choresMissingStartTime = new ConcurrentHashMap(); + } + + public static ChoreService getInstance() { + return new ChoreService(); + } + + /** + * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService + * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled + * with a single ChoreService instance). + */ + public synchronized void scheduleChore(ScheduledChore chore) { + scheduleChore(chore, INITIAL_DELAY); + } + + /** + * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService + * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled + * with a single ChoreService instance). + * @param initialDelay The initial delay before the scheduled chore becomes active. Measured in + * same units as chore.getTimeUnit() + */ + public synchronized void scheduleChore(ScheduledChore chore, final long initialDelay) { + if (chore == null) { + throw new NullPointerException("chore cannot be null"); + } + chore.setChoreServicer(this); + ScheduledFuture future = + scheduler.scheduleAtFixedRate(chore, initialDelay, chore.getPeriod(), chore.getTimeUnit()); + scheduledChores.put(chore, future); + } + + /** + * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService + * yet then this call is equivalent to a call to scheduleChore. + */ + private synchronized void rescheduleChore(ScheduledChore chore) { + rescheduleChore(chore, INITIAL_DELAY); + } + + /** + * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService + * yet then this call is equivalent to a call to scheduleChore. + * @param initialDelay The initial delay before the scheduled chore becomes active. Measured in + * same units as chore.getTimeUnit() + */ + private synchronized void rescheduleChore(ScheduledChore chore, final long initialDelay) { + if (chore == null) return; + + if (scheduledChores.containsKey(chore)) { + ScheduledFuture future = scheduledChores.get(chore); + future.cancel(true); + } + scheduleChore(chore, initialDelay); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore) { + cancelChore(chore, false); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { + if (chore != null && scheduledChores.containsKey(chore)) { + ScheduledFuture future = scheduledChores.get(chore); + future.cancel(mayInterruptIfRunning); + scheduledChores.remove(chore); + + // Removing a chore that was missing its start time means it may be possible + // to reduce the number of threads + if (choresMissingStartTime.containsKey(chore)) { + choresMissingStartTime.remove(chore); + requestCorePoolDecrease(); + } + } + } + + @Override + public boolean isChoreScheduled(ScheduledChore chore) { + return chore != null && scheduledChores.containsKey(chore) + && !scheduledChores.get(chore).isDone(); + } + + @Override + public synchronized boolean triggerNow(ScheduledChore chore) { + if (chore == null) { + return false; + } else { + rescheduleChore(chore); + return true; + } + } + + public int getNumberOfScheduledChores() { + return scheduledChores.size(); + } + + public int getNumberOfChoresMissingStartTime() { + return choresMissingStartTime.size(); + } + + public int getCorePoolSize() { + return scheduler.getCorePoolSize(); + } + + /** + * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are + * daemon threads, and thus, don't prevent the JVM from shutting down + */ + public static class ChoreServiceThreadFactory implements ThreadFactory { + private final static String THREAD_NAME_PREFIX = "ChoreServiceThread_"; + private long threadNumber = 0; + + @Override + public Thread newThread(Runnable r) { + threadNumber++; + Thread thread = new Thread(r, THREAD_NAME_PREFIX + threadNumber); + thread.setDaemon(true); + return thread; + } + } + + /** + * Represents a request to increase the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is not sufficient to service all of + * the currently running Chores + * @return true when the request to increase the core pool size succeeds + */ + public synchronized boolean requestCorePoolIncrease() { + // There is no point in creating more threads than scheduledChores.size since scheduled runs + // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced + // amongst occurrences of the same chore). + if (scheduler.getCorePoolSize() < scheduledChores.size()) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); + printChoreServiceDetails("requestCorePoolIncrease"); + return true; + } + return false; + } + + /** + * Represents a request to decrease the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is more than sufficient to service the + * running Chores. + */ + public synchronized void requestCorePoolDecrease() { + if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); + printChoreServiceDetails("requestCorePoolDecrease"); + } + } + + @Override + public void onChoreMadeStartTime(ScheduledChore chore) { + // TODO: This is a placeholder for future extension... As it currently stands, + // We care if a chore MISSES its start time. However, if a chore makes its start + // time then things are progressing as expected + } + + @Override + public synchronized void onChoreMissedStartTime(ScheduledChore chore) { + if (chore == null || !scheduledChores.containsKey(chore)) return; + + // If the chore has not caused an increase in the size of the core thread pool then request an + // increase. This allows each chore missing its start time to increase the core pool size by + // at most 1. + if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { + choresMissingStartTime.put(chore, requestCorePoolIncrease()); + } + + // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If + // the chore is NOT rescheduled, future executions of this chore will be delayed more and + // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor schedules + // chores based on how delayed they are. + rescheduleChore(chore); + printChoreDetails("onChoreMissedStartTime", chore); + } + + // TODO: remove... used for debugging + private void printChoreDetails(final String header, ScheduledChore chore) { + LinkedHashMap output = new LinkedHashMap(); + output.put(header, ""); + output.put("Chore name: ", chore.getName()); + output.put("Chore period: ", Integer.toString(chore.getPeriod())); + output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); + + for (Entry entry : output.entrySet()) { + System.out.println(entry.getKey() + entry.getValue()); + } + + System.out.println(); + } + + // TODO: remove... used for debugging + private void printChoreServiceDetails(final String header) { + LinkedHashMap output = new LinkedHashMap(); + output.put(header, ""); + output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); + output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); + output.put("ChoreService missingStartTimeCount: ", + Integer.toString(getNumberOfChoresMissingStartTime())); + + for (Entry entry : output.entrySet()) { + System.out.println(entry.getKey() + entry.getValue()); + } + + System.out.println(); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java new file mode 100644 index 0000000..106b1c1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -0,0 +1,192 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public abstract class ScheduledChore implements Runnable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + private final String name; + private final int period; + private final TimeUnit timeUnit; + + /** + * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is + * not scheduled. + */ + private ChoreServicer choreServicer; + + /** + * Variables that encapsulate the meaningful state information + */ + private long timeOfLastRun = -1; + private long timeOfThisRun = -1; + private boolean initialChoreComplete = false; + + public interface ChoreServicer { + public void cancelChore(ScheduledChore chore); + + public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); + + public boolean isChoreScheduled(ScheduledChore chore); + + public boolean triggerNow(ScheduledChore chore); + + public void onChoreMissedStartTime(ScheduledChore chore); + + public void onChoreMadeStartTime(ScheduledChore chore); + } + + public ScheduledChore(final String name, final int period) { + this(name, period, TimeUnit.MILLISECONDS); + } + + public ScheduledChore(final String name, final int period, final TimeUnit unit) { + this.name = name; + this.period = period; + this.timeUnit = unit; + } + + public void resetState() { + timeOfLastRun = -1; + timeOfThisRun = -1; + initialChoreComplete = false; + } + + @Override + public synchronized void run() { + timeOfLastRun = timeOfThisRun; + timeOfThisRun = System.currentTimeMillis(); + + if (missedStartTime()) { + choreServicer.onChoreMissedStartTime(this); + return; + } else { + choreServicer.onChoreMadeStartTime(this); + } + + try { + if (!initialChoreComplete) { + initialChoreComplete = initialChore(); + } else { + chore(); + } + } catch (Throwable t) { + LOG.fatal(getName() + "error", t); + cleanup(); + + if (choreServicer != null) { + choreServicer.cancelChore(this); + } + } + } + + public long getTimeBetweenRuns() { + return timeOfThisRun - timeOfLastRun; + } + private boolean missedStartTime() { + return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) + && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); + } + + private double getMaximumAllowedTimeBetweenRuns() { + // Threshold used to determine if the Chore's current run started too late + return 1.5 * period; + } + + private boolean isValidTime(final long time) { + return time > 0 && time <= System.currentTimeMillis(); + } + + /** + * @return false when unsuccessful. This occurs when the Chore is not currently scheduled with a + * ChoreService + */ + public boolean triggerNow() { + if (choreServicer != null) { + return choreServicer.triggerNow(this); + } else { + return false; + } + } + + public void setChoreServicer(ChoreServicer service) { + // Chores should only ever be scheduled with a single ChoreService. If the choreServicer + // is changing, cancel any existing schedules of this chore. + if (choreServicer != null && choreServicer != service) { + choreServicer.cancelChore(this, false); + } + choreServicer = service; + } + + public void cancel() { + cancel(false); + } + + public void cancel(boolean mayInterruptIfRunning) { + if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning); + } + + public String getName() { + return name; + } + + public int getPeriod() { + return period; + } + + public final TimeUnit getTimeUnit() { + return timeUnit; + } + + public boolean isInitialChoreComplete() { + return initialChoreComplete; + } + + /** + * @return true when this Chore is scheduled with a ChoreService + */ + public boolean isScheduled() { + return choreServicer != null && choreServicer.isChoreScheduled(this); + } + + /** + * The task to execute on each scheduled execution of the Chore + */ + protected abstract void chore(); + + /** + * Override to run a task before we start looping. + * @return true if initial chore was successful + */ + protected boolean initialChore() { + // Default does nothing + return true; + } + + /** + * Override to run cleanup tasks when the Chore encounters an error. + */ + protected void cleanup() { + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java new file mode 100644 index 0000000..a8c079f --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -0,0 +1,739 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; +import org.junit.Test; + +public class TestChoreService { + private static class TestHelper { + public final static int DEFAULT_CHORE_PERIOD = 1000; + + public static Chore createChore(String name) { + return createChore(name, DEFAULT_CHORE_PERIOD); + } + + public static Chore createChore(final String name, int period_millis) { + Stoppable stopper = new Stoppable() { + boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + + } + + @Override + public boolean isStopped() { + return stopped; + } + }; + + Chore chore = new Chore(name, 1000, stopper) { + + @Override + protected void chore() { + // DO NOTHING + System.out.println(this.getName() + ": tick"); + } + }; + + return chore; + } + + // TODO: separate out into its own class TestThreadPoolHelper or something along those + // lines + public final static int DEFAULT_POOL_SIZE = 2; + public final static int DEFAULT_PERIOD = 1000; + public final static int DEFAULT_DELAY = 0; + public final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + + public static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() { + return new ScheduledThreadPoolExecutor(DEFAULT_POOL_SIZE); + } + + public static Runnable createRunnable(final String tickLabel) { + return new Runnable() { + + long lastRun = -1; + + @Override + public void run() { + String output = tickLabel + ": tick"; + if (lastRun != -1) { + output += "time since last run : " + (System.currentTimeMillis() - lastRun); + } + System.out.println(output); + + lastRun = System.currentTimeMillis(); + } + }; + } + + public static void scheduleAndSleep(ScheduledThreadPoolExecutor executor, + final String name) throws InterruptedException { + final int sleepPeriod = 1000; + final Runnable run = createRunnable(name); + scheduleWithDefaults(run, executor); + Thread.sleep(sleepPeriod); + } + + public static ScheduledFuture scheduleWithDefaults(Runnable run, + ScheduledThreadPoolExecutor executor) { + return executor.scheduleAtFixedRate(run, DEFAULT_DELAY, DEFAULT_PERIOD, DEFAULT_TIME_UNIT); + } + + public static ScheduledFuture scheduleJobAtFixedRate(ScheduledThreadPoolExecutor executor, + final String jobName) { + final Runnable run = createRunnable(jobName); + return scheduleWithDefaults(run, executor); + } + } + + @Test + public void testExistingChoreImplementationStartStop() { + final int initialThreadCount = Thread.activeCount(); + + Chore chore1 = TestHelper.createChore("chore1"); + Chore chore2 = TestHelper.createChore("chore2"); + Chore chore3 = TestHelper.createChore("chore3"); + + // The current chore implementation creates a thread for each chore to run in separately + chore1.start(); + chore2.start(); + chore3.start(); + + assertEquals("Thread count mismatch", initialThreadCount + 3, Thread.activeCount()); + + chore1.getStopper().stop(""); + chore2.getStopper().stop(""); + chore3.getStopper().stop(""); + + assertEquals("Thread count mismatch", initialThreadCount, Thread.activeCount()); + } + + @Test + public void testThreadCountScheduledThreadPoolExecutor() throws InterruptedException { + final int initialThreadCount = Thread.activeCount(); + + ScheduledThreadPoolExecutor executor = TestHelper.createScheduledThreadPoolExecutor(); + final int corePoolSize = executor.getCorePoolSize(); + + // Should never create more than corePoolSize threads to handle scheduling + TestHelper.scheduleAndSleep(executor, "run1"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run2"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run3"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run4"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run5"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run6"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + + TestHelper.scheduleAndSleep(executor, "run7"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run8"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run9"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run10"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run11"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + TestHelper.scheduleAndSleep(executor, "run12"); + assertTrue(initialThreadCount + corePoolSize <= Thread.activeCount()); + } + + @Test + public void testCancellingAnOngoingScheduledJob() throws InterruptedException { + ScheduledThreadPoolExecutor executor = TestHelper.createScheduledThreadPoolExecutor(); + + ScheduledFuture future1 = TestHelper.scheduleJobAtFixedRate(executor, "future1"); + ScheduledFuture future2 = TestHelper.scheduleJobAtFixedRate(executor, "future2"); + ScheduledFuture future3 = TestHelper.scheduleJobAtFixedRate(executor, "future3"); + + final int delayBetweenCancels = 2000; + + Thread.sleep(delayBetweenCancels); + future1.cancel(true); + assertTrue(future1.isCancelled() && future1.isDone()); + + Thread.sleep(delayBetweenCancels); + future2.cancel(true); + assertTrue(future2.isCancelled() && future2.isDone()); + + Thread.sleep(delayBetweenCancels); + future3.cancel(true); + assertTrue(future3.isCancelled() && future3.isDone()); + } + + public static class ScheduledChoreSamples { + /** + * Sleeps for longer than the scheduled period + */ + public static class SlowChore extends ScheduledChore { + public SlowChore(String name, int period) { + super(name, period); + } + + @Override + protected void chore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + } + + public static class DoNothingChore extends ScheduledChore { + public DoNothingChore(String name, int period) { + super(name, period); + } + + @Override + protected void chore() { + // DO NOTHING + } + + } + + public static class SleepingChore extends ScheduledChore { + private int sleepTime; + + public SleepingChore(String name, int chorePeriod, int sleepTime) { + super(name, chorePeriod); + this.sleepTime = sleepTime; + } + + @Override + protected void chore() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + } + + public static class CountingChore extends ScheduledChore { + + private int countOfChoreCalls; + + public CountingChore(String name, int period) { + super(name, period); + } + + @Override + protected void chore() { + countOfChoreCalls++; + } + + public int getCountOfChoreCalls() { + return countOfChoreCalls; + } + } + + public static class FailInitialChore extends ScheduledChore { + private int numberOfFailures; + private int failureThreshold; + + /** + * @param failThreshold Number of times the Chore fails when trying to execute initialChore + * before succeeding + */ + public FailInitialChore(String name, int period, int failThreshold) { + super(name, period); + numberOfFailures = 0; + failureThreshold = failThreshold; + } + + @Override + protected boolean initialChore() { + if (numberOfFailures < failureThreshold) { + numberOfFailures++; + return false; + } else { + return true; + } + } + + @Override + protected void chore() { + assertTrue(numberOfFailures == failureThreshold); + cancel(false); + } + + } + } + + @Test + public void testInitialChorePrecedence() throws InterruptedException { + ChoreService service = ChoreService.getInstance(); + ScheduledChore chore = new FailInitialChore("chore", 100, 5); + service.scheduleChore(chore, 100); + + while (!chore.isInitialChoreComplete() && chore.isScheduled()) { + Thread.sleep(500); + } + } + + @Test + public void testCancelChore() throws InterruptedException { + final int period = 100; + ScheduledChore chore1 = new DoNothingChore("chore1", period); + ChoreService service = ChoreService.getInstance(); + service.scheduleChore(chore1); + + assertTrue(chore1.isScheduled()); + + chore1.cancel(true); + + assertFalse(chore1.isScheduled()); + assertTrue(service.getNumberOfScheduledChores() == 0); + } + + @Test + public void testConstruction() throws InterruptedException { + final String NAME = "chore"; + final int PERIOD = 100; + final TimeUnit UNIT = TimeUnit.NANOSECONDS; + + ScheduledChore chore = new ScheduledChore(NAME, PERIOD, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Name construction failed", chore.getName(), NAME); + assertEquals("Period construction failed", chore.getPeriod(), PERIOD); + assertEquals("TimeUnit construction failed", chore.getTimeUnit(), UNIT); + } + + @Test + public void testFrequencyOfChores() throws InterruptedException { + final int period = 100; + // Small delta that acts as time buffer (allowing chores to complete if running slowly) + final int delta = 5; + ChoreService service = ChoreService.getInstance(); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + + Thread.sleep(10 * period + delta); + assertTrue(chore.getCountOfChoreCalls() == 10); + + Thread.sleep(10 * period); + assertTrue(chore.getCountOfChoreCalls() == 20); + } + + @Test + public void testForceTrigger() throws InterruptedException { + final int period = 100; + final int delta = 5; + ChoreService service = ChoreService.getInstance(); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 10); + + // Force five runs of the chore to occur, sleeping between triggers to ensure the + // chore has time to run + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + + assertTrue(chore.getCountOfChoreCalls() == 15); + + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 25); + } + + @Test + public void testTimeoutOfScheduledThreadPoolExecutor() throws InterruptedException { + final int CORE_POOL = 5; + final TimeUnit unit = TimeUnit.MILLISECONDS; + final int PERIOD = 1000; + final int DELAY = 0; + + final int initialThreads = Thread.activeCount(); + System.out.println("initialThreads: " + initialThreads); + + ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(CORE_POOL); + exec.setKeepAliveTime(1 * 100, TimeUnit.MILLISECONDS); + exec.allowCoreThreadTimeOut(true); + exec.setRemoveOnCancelPolicy(true); + Thread.sleep(100); + + System.out.println("active threads after creating executor: " + Thread.activeCount()); + + Runnable run = new Runnable() { + + @Override + public void run() { + // DO NOTHING + + } + + }; + + ScheduledFuture future1 = exec.scheduleAtFixedRate(run, DELAY, PERIOD, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling one runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + ScheduledFuture future2 = exec.scheduleAtFixedRate(run, DELAY, PERIOD * 2, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling two runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + ScheduledFuture future3 = exec.scheduleAtFixedRate(run, DELAY, PERIOD * 3, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling three runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + ScheduledFuture future4 = exec.scheduleAtFixedRate(run, DELAY, PERIOD * 4, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling four runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + ScheduledFuture future5 = exec.scheduleAtFixedRate(run, DELAY, PERIOD * 5, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling five runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + ScheduledFuture future6 = exec.scheduleAtFixedRate(run, DELAY, PERIOD * 6, unit); + Thread.sleep(2000); + System.out.println("active threads after scheduling six runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future1.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing one runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future2.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing two runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future3.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing three runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future4.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing four runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future5.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing five runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + future6.cancel(true); + Thread.sleep(2000); + System.out.println("active threads after removing six runnable: " + Thread.activeCount() + + " queuesize: " + exec.getQueue().size()); + + Thread.sleep(2000); + System.out.println("active threads after tick...: " + Thread.activeCount() + " queuesize: " + + exec.getQueue().size()); + + Thread.sleep(2000); + System.out.println("active threads after tick...: " + Thread.activeCount() + " queuesize: " + + exec.getQueue().size()); + } + + @Test + public void testScheduledThreadPoolExecutorThreadControl() throws InterruptedException { + final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(10); + printSummaryStats("Initialization", exec); + + Runnable summaryRunnable = new Runnable() { + long numTicks = 0; + + @Override + public void run() { + numTicks++; + printSummaryStats("Runnable", exec); + } + }; + + Runnable doNothingRunnable = new Runnable() { + @Override + public void run() { + } + }; + + exec.scheduleAtFixedRate(doNothingRunnable, 100, 4950, TimeUnit.MILLISECONDS); + exec.scheduleAtFixedRate(summaryRunnable, 100, 5050, TimeUnit.MILLISECONDS); + + long idleCount = 0; + while (idleCount < 20) { + printSummaryStats("Main", exec); + Thread.sleep(15000); + idleCount++; + } + } + + public void printSummaryStats(final String header, final ScheduledThreadPoolExecutor exec) { + HashMap summary = new LinkedHashMap(); + + summary.put("Thread Group: Active Thread Count:", Integer.toString(Thread.activeCount())); + summary.put("Exec: Core Pool Size:", Integer.toString(exec.getCorePoolSize())); + summary.put("Exec: Queue Size", Integer.toString(exec.getQueue().size())); + + System.out.println(header); + + for (Entry entry : summary.entrySet()) { + System.out.println("\t" + entry.getKey() + " " + entry.getValue()); + } + + System.out.println(""); + } + + @Test + public void testCorePoolIncrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(initialCorePoolSize); + assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, + service.getCorePoolSize()); + + final int slowChorePeriod = 100; + SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); + service.scheduleChore(slowChore4); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); + service.scheduleChore(slowChore5); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); + } + + @Test + public void testCorePoolDecrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(initialCorePoolSize); + assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, + service.getCorePoolSize()); + + final int chorePeriod = 10; + SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(chorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); + service.scheduleChore(slowChore4); + + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); + service.scheduleChore(slowChore5); + + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 5); + + slowChore5.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals("Should have shrunk", 4, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 4); + + slowChore4.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals("Should have shrunk", 3, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 3); + + slowChore3.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals("Should have shrunk", 2, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + slowChore2.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals("Cannot go below min", 2, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore1.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals("Cannot go below min", 2, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 0); + + slowChore1.resetState(); + service.scheduleChore(slowChore1); + Thread.sleep(chorePeriod * 10); + assertEquals("Should not change", 2, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore2.resetState(); + service.scheduleChore(slowChore2); + Thread.sleep(chorePeriod * 10); + assertEquals("Should not change", 2, service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod); + service.scheduleChore(fastChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod); + service.scheduleChore(fastChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 4, service.getCorePoolSize()); + + DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod); + service.scheduleChore(fastChore3); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 4, service.getCorePoolSize()); + + DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod); + service.scheduleChore(fastChore4); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 4, service.getCorePoolSize()); + } + + @Test + public void testScratch() throws InterruptedException { + ChoreService service = ChoreService.getInstance(); + ScheduledChore chore1 = new ScheduledChore("1", 100) { + + @Override + protected void chore() { + try { + Thread.sleep(400); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + ScheduledChore chore2 = new ScheduledChore("2", 100) { + + @Override + protected void chore() { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + ScheduledChore chore3 = new ScheduledChore("3", 100) { + + @Override + protected void chore() { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + ScheduledChore chore4 = new ScheduledChore("4", 100) { + + @Override + protected void chore() { + // TODO Auto-generated method stub + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }; + service.scheduleChore(chore1); + + long idleCount = 0; + while (idleCount < 10) { + Thread.sleep(1500); + if (idleCount == 1) { + service.scheduleChore(chore2); + System.out.println("\n***scheduled #2***\n"); + } + if (idleCount == 2) { + service.scheduleChore(chore3); + System.out.println("\n***scheduled #3***\n"); + } + if (idleCount == 3) { + service.scheduleChore(chore4); + System.out.println("\n***scheduled #4***\n"); + } + idleCount++; + } + } +} -- 1.9.3 (Apple Git-50)