diff --git pom.xml pom.xml index 28ad152545..b054cbbe6b 100644 --- pom.xml +++ pom.xml @@ -179,6 +179,7 @@ 3.5.2 1.8 4.11 + 0.4.3 3.0.3 0.9.3 0.9.3 diff --git ql/pom.xml ql/pom.xml index 0c181e515c..5d039c093d 100644 --- ql/pom.xml +++ ql/pom.xml @@ -476,6 +476,12 @@ ${junit.version} test + + net.jodah + concurrentunit + ${concurrentunit.version} + test + org.mockito mockito-all diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 27f0216a1e..262bbb9a0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -607,10 +607,8 @@ public static void endStart(SessionState startSs) private static void start(SessionState startSs, boolean isAsync, LogHelper console) { setCurrentSessionState(startSs); - synchronized(SessionState.class) { - if (!startSs.isStarted.compareAndSet(false, true)) { - return; - } + if (!startSs.isStarted.compareAndSet(false, true)) { + return; } if (startSs.hiveHist == null){ diff --git ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java index 0fa1c81f1a..593a713f46 100644 --- ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java @@ -27,17 +27,17 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hive.common.util.HiveTestUtils; import org.junit.After; import org.junit.Assert; @@ -46,9 +46,13 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.io.Files; +import net.jodah.concurrentunit.Waiter; + /** * Test SessionState */ @@ -310,4 +314,48 @@ public void testCreatePath() throws Exception { assertTrue(e.getMessage().contains("Failed to create directory noPermissions/child")); } } + + /** + * Unit test for SessionState.start concurrency + */ + @Test + public void testConcurrentStart() throws Exception { + int TEST_THREAD_COUNT = 1000; + + class TestAtomic { + private AtomicBoolean isStarted = new AtomicBoolean(false); + public volatile int count = 0; + + public void start() { + if (!isStarted.compareAndSet(false, true)) { + return; + } + System.out.println("reached atomic initalization..."); + count += 1; + } + } + ; + + TestAtomic t = new TestAtomic(); + final Waiter waiter = new Waiter(); + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < TEST_THREAD_COUNT; ++i) { + Runnable runner = new Runnable() { + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + // don't care + } + t.start(); + waiter.resume(); + } + }; + new Thread(runner, "TestThread" + i).start(); + } + latch.countDown(); // release the latch + waiter.await(5000, TEST_THREAD_COUNT); + Assert.assertEquals(1, t.count); + } }