From 3045ae41e6ac99634edd07b578ab3b8358c1796b Mon Sep 17 00:00:00 2001
From: Piotr Pachalko <piotrpachalko@gmail.com>
Date: Fri, 29 Aug 2014 13:13:03 +0200
Subject: [PATCH] KAFKA-1596; Do not throw exception when trying to shutdown
 already downed scheduler

---
 core/src/main/scala/kafka/utils/KafkaScheduler.scala     |    9 +++++----
 core/src/test/scala/unit/kafka/utils/SchedulerTest.scala |   12 +++++++++++-
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 9a16343..7d1d875 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -83,10 +83,11 @@ class KafkaScheduler(val threads: Int,
   
   override def shutdown() {
     debug("Shutting down task scheduler.")
-    ensureStarted
-    executor.shutdown()
-    executor.awaitTermination(1, TimeUnit.DAYS)
-    this.executor = null
+    if(executor != null) {
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.DAYS)
+      executor = null
+    }
   }
 
   def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index b364ac2..34cbc17 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -90,4 +90,14 @@ class SchedulerTest {
       assertTrue("Should count to 20", counter1.get >= 20)
     }
   }
-}
\ No newline at end of file
+
+  @Test
+  def testStayQuietOnShutdown() {
+    scheduler.shutdown();
+    try {
+      scheduler.shutdown()
+    } catch {
+      case e: Exception => fail("consecutive shutdown() should not throw")
+    }
+  }
+}
-- 
1.7.10.4

