diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index a39a9c8e04..d165bb2e6c 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -47,15 +47,12 @@ private static boolean isServerStarted = false; private static int port; private static MiniDFSCluster miniDFS; - // How long should we wait for the housekeeping threads to start in ms. - private static final long SLEEP_INTERVAL_FOR_THREADS_TO_START = 10000; - // Threads using ThreadPool will start after the configured interval. So, start them some time - // before we check the existence of threads. - private static final long REMOTE_TASKS_INTERVAL = SLEEP_INTERVAL_FOR_THREADS_TO_START - 3000; + // Threads using ThreadPool will start after the configured interval. Start them right away + private static final long REMOTE_TASKS_INTERVAL = 1; static final String METASTORE_THREAD_TASK_FREQ_CONF = "metastore.leader.test.task.freq"; static Map threadNames = new HashMap<>(); - static Map threadClasses = new HashMap<>(); + static Map, Boolean> threadClasses = new HashMap<>(); void internalSetup(final String leaderHostName) throws Exception { MetaStoreTestUtils.setConfForStandloneMode(conf); @@ -70,9 +67,9 @@ void internalSetup(final String leaderHostName) throws Exception { Assert.assertNotNull("Unable to connect to the MetaStore server", client); return; } - + // Start the metastore and wait for the background threads port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), - conf, true); + conf, false, false, true, true); System.out.println("Starting MetaStore Server on port " + port); isServerStarted = true; @@ -136,7 +133,7 @@ private long addRemoteOnlyTasksConfigs() { return 2; } - private long addAlwaysTasksConfigs() throws Exception { + private long addAlwaysTasksConfigs() { String alwaysTaskClassPaths = MetastoreTaskThreadAlwaysTestImpl.class.getCanonicalName(); MetastoreConf.setVar(conf, ConfVars.TASK_THREADS_ALWAYS, alwaysTaskClassPaths); threadNames.put(MetastoreTaskThreadAlwaysTestImpl.TASK_NAME, false); @@ -156,10 +153,7 @@ private static String getAllThreadsAsString() { } void searchHousekeepingThreads() throws Exception { - // Client has been created so the metastore has started serving. Sleep for few seconds for - // the housekeeping threads to start. - Thread.sleep(SLEEP_INTERVAL_FOR_THREADS_TO_START); - + // Client has been created so the metastore has started serving and started the background threads LOG.info(getAllThreadsAsString()); // Check if all the housekeeping threads have been started. diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java index 4cd2c58896..28d9f48f9d 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java @@ -52,7 +52,7 @@ public void run() { LOG.info("Name of thread " + Thread.currentThread().getName() + " changed to " + TASK_NAME); Thread.currentThread().setName(TASK_NAME); try { - Thread.sleep(runFrequency(TimeUnit.MILLISECONDS)); + Thread.sleep(10000); } catch (InterruptedException ie) { LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java index c590b6aad5..f7844a4681 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java @@ -52,7 +52,7 @@ public void run() { LOG.info("Name of thread " + Thread.currentThread().getName() + " changed to " + TASK_NAME); Thread.currentThread().setName(TASK_NAME); try { - Thread.sleep(runFrequency(TimeUnit.MILLISECONDS)); + Thread.sleep(10000); } catch (InterruptedException ie) { LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java index 5b50f66c51..9612d2f4b7 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java @@ -52,7 +52,7 @@ public void run() { LOG.info("Name of thread " + Thread.currentThread().getName() + " changed to " + TASK_NAME); Thread.currentThread().setName(TASK_NAME); try { - Thread.sleep(runFrequency(TimeUnit.MILLISECONDS)); + Thread.sleep(10000); } catch (InterruptedException ie) { LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java index 03a8161ea4..320de8dfe9 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java @@ -20,7 +20,6 @@ import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +37,6 @@ public void setUp() throws Exception { internalSetup("localhost"); } - @Ignore("HIVE-23221 Ignore flaky test testHouseKeepingThreadExistence in TestMetastoreHousekeepingLeaderEmptyConfig" + - " and TestMetastoreHousekeepingLeader") @Test public void testHouseKeepingThreadExistence() throws Exception { searchHousekeepingThreads(); @@ -52,7 +49,7 @@ public void testHouseKeepingThreadExistence() throws Exception { Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue()); } - for (Map.Entry entry : threadClasses.entrySet()) { + for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { if (entry.getValue()) { LOG.info("Found thread for " + entry.getKey().getSimpleName()); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java index 75ea637503..382d19b0ba 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java @@ -20,7 +20,6 @@ import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +39,6 @@ public void setUp() throws Exception { internalSetup(""); } - @Ignore("HIVE-23221 Ignore flaky test testHouseKeepingThreadExistence in TestMetastoreHousekeepingLeaderEmptyConfig" + - " and TestMetastoreHousekeepingLeader") @Test public void testHouseKeepingThreadExistence() throws Exception { searchHousekeepingThreads(); @@ -54,7 +51,7 @@ public void testHouseKeepingThreadExistence() throws Exception { Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue()); } - for (Map.Entry entry : threadClasses.entrySet()) { + for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { if (entry.getValue()) { LOG.info("Found thread for " + entry.getKey().getSimpleName()); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java index 0341d3c03b..025f08b0c1 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java @@ -52,7 +52,7 @@ public void testHouseKeepingThreadExistence() throws Exception { Assert.assertFalse("Thread with name " + entry.getKey() + " found.", entry.getValue()); } - for (Map.Entry entry : threadClasses.entrySet()) { + for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { // A non-leader HMS will still run the configured number of Compaction worker threads. if (entry.getKey() == Worker.class) { if (entry.getValue()) { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 7bba8d6ee6..8a2e926773 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -10083,7 +10083,7 @@ public static void main(String[] args) throws Throwable { Condition startCondition = startLock.newCondition(); AtomicBoolean startedServing = new AtomicBoolean(); startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf, startLock, - startCondition, startedServing); + startCondition, startedServing, null); } catch (Throwable t) { // Catch the exception, log it and rethrow it. HMSHandler.LOG @@ -10101,7 +10101,7 @@ public static void main(String[] args) throws Throwable { */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) throws Throwable { - startMetaStore(port, bridge, MetastoreConf.newMetastoreConf(), null, null, null); + startMetaStore(port, bridge, MetastoreConf.newMetastoreConf(), null, null, null, null); } /** @@ -10113,7 +10113,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, Configuration conf) throws Throwable { - startMetaStore(port, bridge, conf, null, null, null); + startMetaStore(port, bridge, conf, null, null, null, null); } /** @@ -10127,7 +10127,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, Configuration conf, Lock startLock, Condition startCondition, - AtomicBoolean startedServing) throws Throwable { + AtomicBoolean startedServing, AtomicBoolean startedBackGroundThreads) throws Throwable { isMetaStoreRemote = true; // Server will create new threads up to max as necessary. After an idle // period, it will destroy threads to keep the number of threads in the @@ -10262,7 +10262,7 @@ public void processContext(ServerContext serverContext, TTransport tTransport, T if (startLock != null) { startMetaStoreThreads(conf, startLock, startCondition, startedServing, - isMetastoreHousekeepingLeader(conf, getServerHostName())); + isMetastoreHousekeepingLeader(conf, getServerHostName()), startedBackGroundThreads); signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); } @@ -10408,8 +10408,8 @@ public void run() { * started only in a leader HMS. */ private static void startMetaStoreThreads(final Configuration conf, final Lock startLock, - final Condition startCondition, final - AtomicBoolean startedServing, boolean isLeader) { + final Condition startCondition, final AtomicBoolean startedServing, boolean isLeader, + final AtomicBoolean startedBackGroundThreads) { // A thread is spun up to start these other threads. That's because we can't start them // until after the TServer has started, but once TServer.serve is called we aren't given back // control. @@ -10463,6 +10463,9 @@ public void run() { if (isLeader) { ReplChangeManager.scheduleCMClearer(conf); } + if (startedBackGroundThreads != null) { + startedBackGroundThreads.set(true); + } } }; t.setDaemon(true); diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java index 2702e69f86..f09d3e6474 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java @@ -69,26 +69,27 @@ * @throws Exception */ public static void startMetaStore(final int port, - final HadoopThriftAuthBridge bridge, Configuration conf, boolean withHouseKeepingThreads) + final HadoopThriftAuthBridge bridge, Configuration conf, boolean withHouseKeepingThreads, boolean waitForHouseKeepers) throws Exception{ if (conf == null) { conf = MetastoreConf.newMetastoreConf(); } final Configuration finalConf = conf; + final Lock startLock = withHouseKeepingThreads ? new ReentrantLock() : null; + final AtomicBoolean startedBackGroundThreads = (withHouseKeepingThreads && waitForHouseKeepers) ? new AtomicBoolean() : null; Thread thread = new Thread(new Runnable() { @Override public void run() { try { - Lock startLock = null; + Condition startCondition = null; AtomicBoolean startedServing = null; if (withHouseKeepingThreads) { - startLock = new ReentrantLock(); startCondition = startLock.newCondition(); startedServing = new AtomicBoolean(); } HiveMetaStore.startMetaStore(port, bridge, finalConf, startLock, startCondition, - startedServing); + startedServing, startedBackGroundThreads); } catch (Throwable e) { LOG.error("Metastore Thrift Server threw an exception...", e); } @@ -98,7 +99,7 @@ public void run() { thread.start(); map.put(port,thread); String msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST); - MetaStoreTestUtils.loopUntilHMSReady(msHost, port); + MetaStoreTestUtils.loopUntilHMSReady(msHost, port, startedBackGroundThreads); String serviceDiscMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE); if (serviceDiscMode != null && serviceDiscMode.equalsIgnoreCase("zookeeper")) { MetaStoreTestUtils.loopUntilZKReady(conf, msHost, port); @@ -124,7 +125,7 @@ public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUr boolean keepWarehousePath) throws Exception { return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf, - keepJdbcUri, keepWarehousePath, false); + keepJdbcUri, keepWarehousePath, false, false); } public static int startMetaStoreWithRetry() throws Exception { @@ -134,13 +135,13 @@ public static int startMetaStoreWithRetry() throws Exception { public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, Configuration conf) throws Exception { - return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false, false); + return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false, false, false); } public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, Configuration conf, boolean withHouseKeepingThreads) throws Exception { - return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false, withHouseKeepingThreads); + return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false, withHouseKeepingThreads, false); } /** @@ -159,7 +160,7 @@ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath, - boolean withHouseKeepingThreads) throws Exception { + boolean withHouseKeepingThreads, boolean waitForHouseKeepers) throws Exception { Exception metaStoreException = null; String warehouseDir = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE); @@ -187,7 +188,7 @@ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + metaStorePort); } - MetaStoreTestUtils.startMetaStore(metaStorePort, bridge, conf, withHouseKeepingThreads); + MetaStoreTestUtils.startMetaStore(metaStorePort, bridge, conf, withHouseKeepingThreads, waitForHouseKeepers); // Creating warehouse dir, if not exists Warehouse wh = new Warehouse(conf); @@ -213,10 +214,10 @@ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, * A simple connect test to make sure that the metastore is up * @throws Exception */ - private static void loopUntilHMSReady(String msHost, int port) throws Exception { + private static void loopUntilHMSReady(String msHost, int port, AtomicBoolean houseKeepingStarted) throws Exception { int retries = 0; Exception exc = null; - while (true) { + while (retries++ < 60) { try { Socket socket = new Socket(); SocketAddress sockAddr; @@ -227,14 +228,18 @@ private static void loopUntilHMSReady(String msHost, int port) throws Exception } socket.connect(sockAddr, 5000); socket.close(); - return; - } catch (Exception e) { - if (retries++ > 60) { //give up - exc = e; - break; + if (houseKeepingStarted == null || houseKeepingStarted.get()) { + return; + } else { + LOG.info("Waiting for housekeeper threads to start"); } - Thread.sleep(1000); + } catch (Exception e) { + exc = e; } + Thread.sleep(1000); + } + if (exc == null) { + exc = new IllegalStateException("Housekeeping threads were not started"); } // something is preventing metastore from starting // print the stack from all threads for debugging purposes