diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 303e306..c128010 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** */ @@ -229,9 +230,9 @@ public void testStatsAfterCompactionPartTbl() throws Exception { Worker t = new Worker(); t.setThreadId((int) t.getId()); t.setHiveConf(conf); - MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); - MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer(); - stop.boolVal = true; + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); t.init(stop, looped); t.run(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index a47619c..25b61a7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -5720,7 +5721,7 @@ public void run() { Lock startLock = new ReentrantLock(); Condition startCondition = startLock.newCondition(); - MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer(); + AtomicBoolean startedServing = new AtomicBoolean(); startMetaStoreThreads(conf, startLock, startCondition, startedServing); startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock, startCondition, startedServing); @@ -5767,7 +5768,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, HiveConf conf, Lock startLock, Condition startCondition, - MetaStoreThread.BooleanPointer startedServing) throws Throwable { + AtomicBoolean startedServing) throws Throwable { try { isMetaStoreRemote = true; // Server will create new threads up to max as necessary. After an idle @@ -5851,7 +5852,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, final Condition startCondition, - final MetaStoreThread.BooleanPointer startedServing) { + final AtomicBoolean startedServing) { // A simple thread to wait until the server has started and then signal the other threads to // begin Thread t = new Thread() { @@ -5866,7 +5867,7 @@ public void run() { } while (!server.isServing()); startLock.lock(); try { - startedServing.boolVal = true; + startedServing.set(true); startCondition.signalAll(); } finally { startLock.unlock(); @@ -5882,7 +5883,7 @@ public void run() { */ private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock, final Condition startCondition, final - MetaStoreThread.BooleanPointer startedServing) { + AtomicBoolean startedServing) { // A thread is spun up to start these other threads. That's because we can't start them // until after the TServer has started, but once TServer.serve is called we aren't given back // control. @@ -5900,7 +5901,7 @@ public void run() { try { // Per the javadocs on Condition, do not depend on the condition alone as a start gate // since spurious wake ups are possible. - while (!startedServing.boolVal) startCondition.await(); + while (!startedServing.get()) startCondition.await(); startCompactorInitiator(conf); startCompactorWorkers(conf); startCompactorCleaner(conf); @@ -5960,7 +5961,7 @@ private static void initializeAndStartThread(MetaStoreThread thread, HiveConf co LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.setHiveConf(conf); thread.setThreadId(nextThreadId++); - thread.init(new MetaStoreThread.BooleanPointer(), new MetaStoreThread.BooleanPointer()); + thread.init(new AtomicBoolean(), new AtomicBoolean()); thread.start(); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index ff265e5..a0c8d3b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; +import java.util.concurrent.atomic.AtomicBoolean; + /** * A thread that runs in the metastore, separate from the threads in the thrift service. */ @@ -49,21 +51,12 @@ * thread should then assure that the loop has been gone completely through at * least once. */ - void init(BooleanPointer stop, BooleanPointer looped) throws MetaException; + void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException; /** * Run the thread in the background. This must not be called until - * {@link #init(org.apache.hadoop.hive.metastore.MetaStoreThread.BooleanPointer)} has + * {@link ##init(java.util.concurrent.atomic.AtomicBoolean, java.util.concurrent.atomic.AtomicBoolean)} has * been called. */ void start(); - - class BooleanPointer { - public boolean boolVal; - - public BooleanPointer() { - boolVal = false; - } - } - } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 446b174..e1f1f49 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static junit.framework.Assert.*; @@ -1104,7 +1105,7 @@ public void deadlockDetected() throws Exception { conn.commit(); txnHandler.closeDbConn(conn); - final MetaStoreThread.BooleanPointer sawDeadlock = new MetaStoreThread.BooleanPointer(); + final AtomicBoolean sawDeadlock = new AtomicBoolean(); final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); @@ -1131,7 +1132,7 @@ public void run() { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">"); - sawDeadlock.boolVal = true; + sawDeadlock.set(true); } } conn1.rollback(); @@ -1161,7 +1162,7 @@ public void run() { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">"); - sawDeadlock.boolVal = true; + sawDeadlock.set(true); } } conn2.rollback(); @@ -1175,9 +1176,9 @@ public void run() { t2.start(); t1.join(); t2.join(); - if (sawDeadlock.boolVal) break; + if (sawDeadlock.get()) break; } - assertTrue(sawDeadlock.boolVal); + assertTrue(sawDeadlock.get()); } finally { conn1.rollback(); txnHandler.closeDbConn(conn1); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 146ebda..1aed097 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -69,7 +69,7 @@ public void run() { // This is solely for testing. It checks if the test has set the looped value to false, // and if so remembers that and then sets it to true at the end. We have to check here // first to make sure we go through a complete iteration of the loop before resetting it. - boolean setLooped = !looped.boolVal; + boolean setLooped = !looped.get(); // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { @@ -137,16 +137,16 @@ public void run() { // Now, go back to bed until it's time to do this again long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= cleanerCheckInterval || stop.boolVal) continue; + if (elapsedTime >= cleanerCheckInterval || stop.get()) continue; else Thread.sleep(cleanerCheckInterval - elapsedTime); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); } if (setLooped) { - looped.boolVal = true; + looped.set(true); } - } while (!stop.boolVal); + } while (!stop.get()); } private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 2bba731..7d097fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * Superclass for all threads in the compactor. @@ -52,8 +53,8 @@ protected CompactionTxnHandler txnHandler; protected RawStore rs; protected int threadId; - protected BooleanPointer stop; - protected BooleanPointer looped; + protected AtomicBoolean stop; + protected AtomicBoolean looped; @Override public void setHiveConf(HiveConf conf) { @@ -67,7 +68,7 @@ public void setThreadId(int threadId) { } @Override - public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { this.stop = stop; this.looped = looped; setPriority(MIN_PRIORITY); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 5545bf7..996ee1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to initiate compactions. This will run in a separate thread. @@ -126,10 +127,10 @@ public void run() { } long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= checkInterval || stop.boolVal) continue; + if (elapsedTime >= checkInterval || stop.get()) continue; else Thread.sleep(checkInterval - elapsedTime); - } while (!stop.boolVal); + } while (!stop.get()); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException(t)); @@ -137,7 +138,7 @@ public void run() { } @Override - public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { super.init(stop, looped); checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index e2388e7..c4a4c39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to do compactions. This will run in a separate thread. It will spin on the @@ -77,7 +78,7 @@ public void run() { do { CompactionInfo ci = txnHandler.findNextToCompact(name); - if (ci == null && !stop.boolVal) { + if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; @@ -160,7 +161,7 @@ public Object run() throws Exception { ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); txnHandler.markCleaned(ci); } - } while (!stop.boolVal); + } while (!stop.get()); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + name + ", exiting " + StringUtils.stringifyException(t)); @@ -168,7 +169,7 @@ public Object run() throws Exception { } @Override - public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { super.init(stop, looped); StringBuilder name = new StringBuilder(hostname()); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 9ebdfd3..d68e431 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Properties; import java.util.Stack; +import java.util.concurrent.atomic.AtomicBoolean; /** * Super class for all of the compactor test modules. @@ -65,7 +66,7 @@ protected long sleepTime = 1000; protected HiveConf conf; - private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + private final AtomicBoolean stop = new AtomicBoolean(); private final File tmpdir; protected CompactorTest() throws Exception { @@ -92,7 +93,7 @@ protected void startCleaner() throws Exception { startThread('c', true); } - protected void startCleaner(MetaStoreThread.BooleanPointer looped) throws Exception { + protected void startCleaner(AtomicBoolean looped) throws Exception { startThread('c', false, looped); } @@ -190,7 +191,7 @@ protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnE } protected void stopThread() { - stop.boolVal = true; + stop.set(true); } private StorageDescriptor newStorageDescriptor(String location, List sortCols) { @@ -218,10 +219,10 @@ private StorageDescriptor newStorageDescriptor(String location, List sort // I can't do this with @Before because I want to be able to control when the thead starts private void startThread(char type, boolean stopAfterOne) throws Exception { - startThread(type, stopAfterOne, new MetaStoreThread.BooleanPointer()); + startThread(type, stopAfterOne, new AtomicBoolean()); } - private void startThread(char type, boolean stopAfterOne, MetaStoreThread.BooleanPointer looped) + private void startThread(char type, boolean stopAfterOne, AtomicBoolean looped) throws Exception { TxnDbUtil.setConfValues(conf); CompactorThread t = null; @@ -233,7 +234,7 @@ private void startThread(char type, boolean stopAfterOne, MetaStoreThread.Boolea } t.setThreadId((int) t.getId()); t.setHiveConf(conf); - stop.boolVal = stopAfterOne; + stop.set(stopAfterOne); t.init(stop, looped); if (stopAfterOne) t.run(); else t.start(); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index b63ad66..7687851 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Tests for the compactor Cleaner thread @@ -285,12 +286,12 @@ public void notBlockedBySubsequentLock() throws Exception { LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); - MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer(); - looped.boolVal = false; + AtomicBoolean looped = new AtomicBoolean(); + looped.set(false); startCleaner(looped); // Make sure the compactor has a chance to run once - while (!looped.boolVal) { + while (!looped.get()) { Thread.currentThread().sleep(100); } @@ -310,9 +311,9 @@ public void notBlockedBySubsequentLock() throws Exception { // Unlock the previous lock txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.boolVal = false; + looped.set(false); - while (!looped.boolVal) { + while (!looped.get()) { Thread.currentThread().sleep(100); } stopThread(); @@ -356,12 +357,12 @@ public void partitionNotBlockedBySubsequentLock() throws Exception { LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); - MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer(); - looped.boolVal = false; + AtomicBoolean looped = new AtomicBoolean(); + looped.set(false); startCleaner(looped); // Make sure the compactor has a chance to run once - while (!looped.boolVal) { + while (!looped.get()) { Thread.currentThread().sleep(100); } @@ -383,9 +384,9 @@ public void partitionNotBlockedBySubsequentLock() throws Exception { // Unlock the previous lock txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.boolVal = false; + looped.set(false); - while (!looped.boolVal) { + while (!looped.get()) { Thread.currentThread().sleep(100); } stopThread();