From cf17c08f6dfa79627b86e37341ce7183658fb977 Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Sun, 23 May 2021 06:18:03 +0530 Subject: [PATCH 1/7] HIVE-25154: Disable StatsUpdaterThread and PartitionManagementTask --- .../hive/ql/stats/StatsUpdaterThread.java | 48 ++++++++++++---- .../hive/ql/stats/TestStatsUpdaterThread.java | 57 +++++++++++++++++++ .../hadoop/hive/common/repl/ReplConst.java | 5 ++ .../hive/metastore/utils/MetaStoreUtils.java | 11 ++++ .../metastore/PartitionManagementTask.java | 37 ++++++++++-- .../metastore/TestPartitionManagement.java | 22 +++++++ 6 files changed, 164 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 52dd40b68efe..df2b30494599 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -50,12 +52,14 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -94,6 +98,8 @@ private BlockingQueue workQueue; private Thread[] workers; + private Set dbsBeingFailedOver; + @Override public void setConf(Configuration conf) { StatsUpdateMode mode = StatsUpdateMode.valueOf( @@ -143,6 +149,7 @@ public void init(AtomicBoolean stop) throws MetaException { txnHandler = TxnUtils.getTxnStore(conf); rs = RawStoreProxy.getProxy(conf, conf, MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); + dbsBeingFailedOver = new HashSet<>(); for (int i = 0; i < workers.length; ++i) { workers[i] = new Thread(new WorkerRunnable(conf, user)); workers[i].setDaemon(true); @@ -155,6 +162,7 @@ public void run() { LOG.info("Stats updater thread started"); startWorkers(); while (!stop.get()) { + dbsBeingFailedOver.clear(); boolean hadUpdates = runOneIteration(); try { Thread.sleep(hadUpdates ? 0 : noUpdatesWaitMs); @@ -187,9 +195,10 @@ public boolean runOneIteration() { } LOG.debug("Processing {}", fullTableNames); boolean hadUpdates = false; + Map dbsToSkip = new HashMap<>(); for (TableName fullTableName : fullTableNames) { try { - List commands = processOneTable(fullTableName); + List commands = processOneTable(fullTableName, dbsToSkip); hadUpdates = hadUpdates || commands != null; if (commands != null) { for (AnalyzeWork req : commands) { @@ -210,10 +219,25 @@ private void stopWorkers() { } } - private List processOneTable(TableName fullTableName) + private List processOneTable(TableName fullTableName, Map dbsToSkip) throws MetaException, NoSuchTxnException, NoSuchObjectException { if (isAnalyzeTableInProgress(fullTableName)) return null; String cat = fullTableName.getCat(), db = fullTableName.getDb(), tbl = fullTableName.getTable(); + String dbName = MetaStoreUtils.prependCatalogToDbName(cat,db, conf); + if (!dbsToSkip.containsKey(dbName)) { + Database database = rs.getDatabase(cat, db); + dbsToSkip.put(dbName, ReplUtils.isTargetOfReplication(database) + || MetaStoreUtils.isDbBeingFailedOver(database)); + } + // If the table is being replicated into, + // 1. the stats are also replicated from the source, so we don't need those to be calculated on the target again + // 2. updating stats requires a writeId to be created. Hence writeIds on source and target can get out of sync + // when stats are updated. That can cause consistency issues. + + if (dbsToSkip.get(dbName)) { + LOG.debug("Skipping table {}", tbl); + return null; + } Table table = rs.getTable(cat, db, tbl); LOG.debug("Processing table {}", table); @@ -221,16 +245,6 @@ private void stopWorkers() { String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY); if ("true".equalsIgnoreCase(skipParam)) return null; - // If the table is being replicated into, - // 1. the stats are also replicated from the source, so we don't need those to be calculated - // on the target again - // 2. updating stats requires a writeId to be created. Hence writeIds on source and target - // can get out of sync when stats are updated. That can cause consistency issues. - if (ReplUtils.isTargetOfReplication(rs.getDatabase(cat, db))) { - LOG.debug("Skipping table {} since it is being replicated into", table); - return null; - } - // Note: ideally we should take a lock here to pretend to be a real reader. // For now, this check is going to have race potential; it may run a spurious analyze. String writeIdString = null; @@ -625,6 +639,16 @@ public boolean runOneWorkerIteration( } String cmd = null; try { + TableName tb = req.tableName; + String dbName = MetaStoreUtils.prependCatalogToDbName(tb.getCat(),tb.getDb(), conf); + if (dbsBeingFailedOver.contains(dbName) + || MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { + if (!dbsBeingFailedOver.contains(dbName)) { + dbsBeingFailedOver.add(dbName); + } + LOG.debug("Skipping table {}", tb.getTable()); + return true; + } cmd = req.buildCommand(); LOG.debug("Running {} based on {}", cmd, req); if (doWait) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index 12dffb422f28..61206d8f58ae 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -608,6 +608,63 @@ private void testNoStatsUpdateForReplTable(String tblNamePrefix, String txnPrope msClient.close(); } + @Test(timeout=80000) + public void testNoStatsUpdateForSimpleFailoverDb() throws Exception { + testNoStatsUpdateForFailoverDb("simple", ""); + } + + @Test(timeout=80000) + public void testNoStatsUpdateForTxnFailoverDb() throws Exception { + testNoStatsUpdateForFailoverDb("txn", + "TBLPROPERTIES (\"transactional\"=\"true\",\"transactional_properties\"=\"insert_only\")"); + } + + private void testNoStatsUpdateForFailoverDb(String tblNamePrefix, String txnProperty) throws Exception { + // Set high worker count so we get a longer queue. + hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 4); + String tblWOStats = tblNamePrefix + "_repl_failover_nostats"; + String ptnTblWOStats = tblNamePrefix + "_ptn_repl_failover_nostats"; + String dbName = ss.getCurrentDatabase(); + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + + executeQuery("create table " + tblWOStats + "(i int, s string) " + txnProperty); + executeQuery("insert into " + tblWOStats + "(i, s) values (1, 'test')"); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false); + + executeQuery("create table " + ptnTblWOStats + "(s string) partitioned by (i int) " + txnProperty); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (1, 'test')"); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (2, 'test2')"); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (3, 'test3')"); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); + + assertTrue(su.runOneIteration()); + Assert.assertEquals(2, su.getQueueLength()); + executeQuery("alter database " + dbName + " set dbproperties('" + ReplConst.REPL_FAILOVER_ENABLED + "'='true')"); + //StatsUpdaterThread would not run analyze commands for the tables which were inserted before + //failover property was enabled for that database + drainWorkQueue(su, 2); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); + Assert.assertEquals(0, su.getQueueLength()); + + executeQuery("create table new_table(s string) partitioned by (i int) " + txnProperty); + executeQuery("insert into new_table(i, s) values (4, 'test4')"); + + assertFalse(su.runOneIteration()); + Assert.assertEquals(0, su.getQueueLength()); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); + + executeQuery("alter database " + dbName + " set dbproperties('" + ReplConst.REPL_FAILOVER_ENABLED + "'='')"); + executeQuery("drop table " + tblWOStats); + executeQuery("drop table " + ptnTblWOStats); + executeQuery("drop table new_table"); + msClient.close(); + } + private void verifyPartStatsUpToDate(int partCount, int skip, IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws Exception { for (int i = skip; i < partCount; ++i) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index 37a3b51d3aad..4e7f66933580 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -45,5 +45,10 @@ */ public static final String REPL_TARGET_TABLE_PROPERTY = "repl.last.id"; + /** + * Database level prop to identify the database which is being failed over. + * */ + public static final String REPL_FAILOVER_ENABLED = "repl.failover.enabled"; + public static final String TARGET_OF_REPLICATION = "repl.target.for"; } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 73005096a838..ad7be17b1277 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -229,6 +231,15 @@ public static boolean isExternalTable(Table table) { return isExternal(params); } + public static boolean isDbBeingFailedOver(Database db) { + assert (db != null); + Map dbParameters = db.getParameters(); + if ((dbParameters != null) && (dbParameters.containsKey(ReplConst.REPL_FAILOVER_ENABLED))) { + return ReplConst.TRUE.equals(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); + } + return false; + } + /** * Determines whether an table needs to be purged or not. * diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 390b15ff03a3..72e64712cfa4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.TimeValidator; import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,11 +135,21 @@ public void run() { LOG.info("Looking for tables using catalog: {} dbPattern: {} tablePattern: {} found: {}", catalogName, dbPattern, tablePattern, foundTableMetas.size()); + Map databasesToSkip = new HashMap<>(); + for (TableMeta tableMeta : foundTableMetas) { try { + String dbName = MetaStoreUtils.prependCatalogToDbName(tableMeta.getCatName(), tableMeta.getDbName(), conf); + if (!databasesToSkip.containsKey(dbName)) { + Database db = msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName()); + databasesToSkip.put(dbName, isTargetOfReplication(db) || MetaStoreUtils.isDbBeingFailedOver(db)); + } + if (databasesToSkip.get(dbName)) { + LOG.info("Skipping table : {}", tableMeta.getTableName()); + continue; + } Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName()); - Database db = msc.getDatabase(table.getCatName(), table.getDbName()); - if (partitionDiscoveryEnabled(table.getParameters()) && !isTargetOfReplication(db)) { + if (partitionDiscoveryEnabled(table.getParameters())) { candidateTables.add(table); } } catch (NoSuchObjectException e) { @@ -161,6 +173,8 @@ public void run() { LOG.info("Found {} candidate tables for partition discovery", candidateTables.size()); setupMsckPathInvalidation(); Configuration msckConf = Msck.getMsckConf(conf); + databasesToSkip.clear(); + Set dbsBeingFailedOver = new HashSet<>(); for (Table table : candidateTables) { qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); long retentionSeconds = getRetentionPeriodInSeconds(table); @@ -169,7 +183,8 @@ public void run() { // this always runs in 'sync' mode where partitions can be added and dropped MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(), null, null, true, true, true, retentionSeconds); - executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch)); + executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, + countDownLatch, dbsBeingFailedOver, msc)); } countDownLatch.await(); executorService.shutdownNow(); @@ -222,17 +237,31 @@ private void setupMsckPathInvalidation() { private Configuration conf; private String qualifiedTableName; private CountDownLatch countDownLatch; + private Set dbsBeingFailedOver; + private IMetaStoreClient msc; - MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) { + MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, + CountDownLatch countDownLatch, Set dbsBeingFailedOver, IMetaStoreClient msc) { this.msckInfo = msckInfo; this.conf = conf; this.qualifiedTableName = qualifiedTableName; this.countDownLatch = countDownLatch; + this.dbsBeingFailedOver = dbsBeingFailedOver; + this.msc = msc; } @Override public void run() { try { + String dbName = MetaStoreUtils.prependCatalogToDbName(msckInfo.getCatalogName(), msckInfo.getDbName(), conf); + if (dbsBeingFailedOver.contains(dbName) || + MetaStoreUtils.isDbBeingFailedOver(msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName()))) { + if (!dbsBeingFailedOver.contains(dbName)) { + dbsBeingFailedOver.add(dbName); + } + LOG.info("Skipping table: {} " + msckInfo.getTableName()); + return; + } Msck msck = new Msck( true, true); msck.init(conf); msck.repair(msckInfo); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java index 9d0727495f41..829fe588520f 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java @@ -380,6 +380,16 @@ public void testPartitionDiscoveryDBPattern() throws TException, IOException { runPartitionManagementTask(conf); partitions = client.listPartitions(dbName, tableName, (short) -1); assertEquals(5, partitions.size()); + + fs.mkdirs(new Path(tablePath, "state=MG/dt=2021-28-05")); + assertEquals(6, fs.listStatus(tablePath).length); + Database db = client.getDatabase(table.getDbName()); + //PartitionManagementTask would not run for the database which is being failed over. + db.putToParameters(ReplConst.REPL_FAILOVER_ENABLED, ReplConst.TRUE); + client.alterDatabase(dbName, db); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); } @Test @@ -525,6 +535,18 @@ public void testPartitionRetention() throws TException, IOException, Interrupted partitions = client.listPartitions(dbName, tableName, (short) -1); assertEquals(5, partitions.size()); + Database db = client.getDatabase(table.getDbName()); + db.putToParameters(ReplConst.REPL_FAILOVER_ENABLED, ReplConst.TRUE); + client.alterDatabase(table.getDbName(), db); + // PartitionManagementTask would not do anything because the db is being failed over. + Thread.sleep(30 * 1000); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + + db.putToParameters(ReplConst.REPL_FAILOVER_ENABLED, ""); + client.alterDatabase(table.getDbName(), db); + // after 30s all partitions should have been gone Thread.sleep(30 * 1000); runPartitionManagementTask(conf); From bdf2bc288d6d1751f99d20446c5f535545f0ed0b Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Wed, 2 Jun 2021 11:56:37 +0530 Subject: [PATCH 2/7] Change 1. --- .../hive/ql/stats/StatsUpdaterThread.java | 22 +++++++++-------- .../metastore/PartitionManagementTask.java | 24 +++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index df2b30494599..161bc208d69b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -23,8 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.HashSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -98,7 +96,7 @@ private BlockingQueue workQueue; private Thread[] workers; - private Set dbsBeingFailedOver; + private Map dbsBeingFailedOver; @Override public void setConf(Configuration conf) { @@ -149,7 +147,7 @@ public void init(AtomicBoolean stop) throws MetaException { txnHandler = TxnUtils.getTxnStore(conf); rs = RawStoreProxy.getProxy(conf, conf, MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); - dbsBeingFailedOver = new HashSet<>(); + dbsBeingFailedOver = new ConcurrentHashMap<>(); for (int i = 0; i < workers.length; ++i) { workers[i] = new Thread(new WorkerRunnable(conf, user)); workers[i].setDaemon(true); @@ -641,12 +639,12 @@ public boolean runOneWorkerIteration( try { TableName tb = req.tableName; String dbName = MetaStoreUtils.prependCatalogToDbName(tb.getCat(),tb.getDb(), conf); - if (dbsBeingFailedOver.contains(dbName) - || MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { - if (!dbsBeingFailedOver.contains(dbName)) { - dbsBeingFailedOver.add(dbName); - } - LOG.debug("Skipping table {}", tb.getTable()); + if (isDbPresentInFailoverSet(dbName)) { + LOG.info("Skipping table: {} " + tb.getTable()); + return true; + } else if (MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { + dbsBeingFailedOver.put(dbName, true); + LOG.info("Skipping table: {} " + tb.getTable()); return true; } cmd = req.buildCommand(); @@ -670,6 +668,10 @@ public boolean runOneWorkerIteration( return true; } + private synchronized boolean isDbPresentInFailoverSet(String dbName) { + return dbsBeingFailedOver.containsKey(dbName) && dbsBeingFailedOver.get(dbName); + } + public class WorkerRunnable implements Runnable { private final HiveConf conf; private final String user; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 72e64712cfa4..657048351d0d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -160,7 +161,6 @@ public void run() { if (candidateTables.isEmpty()) { return; } - // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also // defeats the purpose of thread pooled msck repair. @@ -173,8 +173,7 @@ public void run() { LOG.info("Found {} candidate tables for partition discovery", candidateTables.size()); setupMsckPathInvalidation(); Configuration msckConf = Msck.getMsckConf(conf); - databasesToSkip.clear(); - Set dbsBeingFailedOver = new HashSet<>(); + Map dbsBeingFailedOver = new ConcurrentHashMap<>(); for (Table table : candidateTables) { qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); long retentionSeconds = getRetentionPeriodInSeconds(table); @@ -237,11 +236,11 @@ private void setupMsckPathInvalidation() { private Configuration conf; private String qualifiedTableName; private CountDownLatch countDownLatch; - private Set dbsBeingFailedOver; + private Map dbsBeingFailedOver; private IMetaStoreClient msc; MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, - CountDownLatch countDownLatch, Set dbsBeingFailedOver, IMetaStoreClient msc) { + CountDownLatch countDownLatch, Map dbsBeingFailedOver, IMetaStoreClient msc) { this.msckInfo = msckInfo; this.conf = conf; this.qualifiedTableName = qualifiedTableName; @@ -250,15 +249,20 @@ private void setupMsckPathInvalidation() { this.msc = msc; } + private synchronized boolean isDbPresentInFailoverSet(String dbName) { + return dbsBeingFailedOver.containsKey(dbName) && dbsBeingFailedOver.get(dbName); + } + @Override public void run() { try { String dbName = MetaStoreUtils.prependCatalogToDbName(msckInfo.getCatalogName(), msckInfo.getDbName(), conf); - if (dbsBeingFailedOver.contains(dbName) || - MetaStoreUtils.isDbBeingFailedOver(msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName()))) { - if (!dbsBeingFailedOver.contains(dbName)) { - dbsBeingFailedOver.add(dbName); - } + if (isDbPresentInFailoverSet(dbName)) { + LOG.info("Skipping table: {} " + msckInfo.getTableName()); + return; + } else if (MetaStoreUtils.isDbBeingFailedOver + (msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName()))) { + dbsBeingFailedOver.put(dbName, true); LOG.info("Skipping table: {} " + msckInfo.getTableName()); return; } From a8d938a186bb469f66d19bc2277a9261928628dd Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Wed, 2 Jun 2021 16:53:21 +0530 Subject: [PATCH 3/7] Changes 2. --- .../hive/ql/stats/StatsUpdaterThread.java | 15 +------------ .../metastore/PartitionManagementTask.java | 22 ++++--------------- 2 files changed, 5 insertions(+), 32 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 161bc208d69b..fedd67a9655c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -96,8 +96,6 @@ private BlockingQueue workQueue; private Thread[] workers; - private Map dbsBeingFailedOver; - @Override public void setConf(Configuration conf) { StatsUpdateMode mode = StatsUpdateMode.valueOf( @@ -147,7 +145,6 @@ public void init(AtomicBoolean stop) throws MetaException { txnHandler = TxnUtils.getTxnStore(conf); rs = RawStoreProxy.getProxy(conf, conf, MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); - dbsBeingFailedOver = new ConcurrentHashMap<>(); for (int i = 0; i < workers.length; ++i) { workers[i] = new Thread(new WorkerRunnable(conf, user)); workers[i].setDaemon(true); @@ -160,7 +157,6 @@ public void run() { LOG.info("Stats updater thread started"); startWorkers(); while (!stop.get()) { - dbsBeingFailedOver.clear(); boolean hadUpdates = runOneIteration(); try { Thread.sleep(hadUpdates ? 0 : noUpdatesWaitMs); @@ -638,12 +634,7 @@ public boolean runOneWorkerIteration( String cmd = null; try { TableName tb = req.tableName; - String dbName = MetaStoreUtils.prependCatalogToDbName(tb.getCat(),tb.getDb(), conf); - if (isDbPresentInFailoverSet(dbName)) { - LOG.info("Skipping table: {} " + tb.getTable()); - return true; - } else if (MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { - dbsBeingFailedOver.put(dbName, true); + if (MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { LOG.info("Skipping table: {} " + tb.getTable()); return true; } @@ -668,10 +659,6 @@ public boolean runOneWorkerIteration( return true; } - private synchronized boolean isDbPresentInFailoverSet(String dbName) { - return dbsBeingFailedOver.containsKey(dbName) && dbsBeingFailedOver.get(dbName); - } - public class WorkerRunnable implements Runnable { private final HiveConf conf; private final String user; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 657048351d0d..b48f64116f5f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -173,7 +173,6 @@ public void run() { LOG.info("Found {} candidate tables for partition discovery", candidateTables.size()); setupMsckPathInvalidation(); Configuration msckConf = Msck.getMsckConf(conf); - Map dbsBeingFailedOver = new ConcurrentHashMap<>(); for (Table table : candidateTables) { qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); long retentionSeconds = getRetentionPeriodInSeconds(table); @@ -182,8 +181,7 @@ public void run() { // this always runs in 'sync' mode where partitions can be added and dropped MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(), null, null, true, true, true, retentionSeconds); - executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, - countDownLatch, dbsBeingFailedOver, msc)); + executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch, msc)); } countDownLatch.await(); executorService.shutdownNow(); @@ -236,33 +234,21 @@ private void setupMsckPathInvalidation() { private Configuration conf; private String qualifiedTableName; private CountDownLatch countDownLatch; - private Map dbsBeingFailedOver; - private IMetaStoreClient msc; + IMetaStoreClient msc; MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, - CountDownLatch countDownLatch, Map dbsBeingFailedOver, IMetaStoreClient msc) { + CountDownLatch countDownLatch, IMetaStoreClient msc) { this.msckInfo = msckInfo; this.conf = conf; this.qualifiedTableName = qualifiedTableName; this.countDownLatch = countDownLatch; - this.dbsBeingFailedOver = dbsBeingFailedOver; this.msc = msc; } - private synchronized boolean isDbPresentInFailoverSet(String dbName) { - return dbsBeingFailedOver.containsKey(dbName) && dbsBeingFailedOver.get(dbName); - } - @Override public void run() { try { - String dbName = MetaStoreUtils.prependCatalogToDbName(msckInfo.getCatalogName(), msckInfo.getDbName(), conf); - if (isDbPresentInFailoverSet(dbName)) { - LOG.info("Skipping table: {} " + msckInfo.getTableName()); - return; - } else if (MetaStoreUtils.isDbBeingFailedOver - (msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName()))) { - dbsBeingFailedOver.put(dbName, true); + if (MetaStoreUtils.isDbBeingFailedOver((msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName())))) { LOG.info("Skipping table: {} " + msckInfo.getTableName()); return; } From 3b4b0a3ef0fcdcce74b50a3fb38e7a4fe4a2845d Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Wed, 2 Jun 2021 21:02:01 +0530 Subject: [PATCH 4/7] Imports --- .../apache/hadoop/hive/metastore/PartitionManagementTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index b48f64116f5f..729a9d33fa48 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; From b16c28f6ea61dd45665e3f7e21d86495bffb024c Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Thu, 3 Jun 2021 12:26:22 +0530 Subject: [PATCH 5/7] new --- .../hive/ql/stats/StatsUpdaterThread.java | 24 ++++++++++--------- .../hive/ql/stats/TestStatsUpdaterThread.java | 1 - .../hive/metastore/utils/MetaStoreUtils.java | 5 +--- .../metastore/PartitionManagementTask.java | 15 +++++++++--- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index fedd67a9655c..34680181bd08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -220,14 +220,16 @@ private void stopWorkers() { String dbName = MetaStoreUtils.prependCatalogToDbName(cat,db, conf); if (!dbsToSkip.containsKey(dbName)) { Database database = rs.getDatabase(cat, db); - dbsToSkip.put(dbName, ReplUtils.isTargetOfReplication(database) - || MetaStoreUtils.isDbBeingFailedOver(database)); + boolean skipDb = false; + if (MetaStoreUtils.isDbBeingFailedOver(database)) { + skipDb = true; + LOG.info("Skipping all the tables which belong to database: {} as it is being failed over", db); + } else if (ReplUtils.isTargetOfReplication(database)) { + skipDb = true; + LOG.info("Skipping all the tables which belong to replicated database: {}", db); + } + dbsToSkip.put(dbName, skipDb); } - // If the table is being replicated into, - // 1. the stats are also replicated from the source, so we don't need those to be calculated on the target again - // 2. updating stats requires a writeId to be created. Hence writeIds on source and target can get out of sync - // when stats are updated. That can cause consistency issues. - if (dbsToSkip.get(dbName)) { LOG.debug("Skipping table {}", tbl); return null; @@ -633,16 +635,16 @@ public boolean runOneWorkerIteration( } String cmd = null; try { + if (doWait) { + SessionState.start(ss); // This is the first call, open the session + } TableName tb = req.tableName; if (MetaStoreUtils.isDbBeingFailedOver(rs.getDatabase(tb.getCat(), tb.getDb()))) { - LOG.info("Skipping table: {} " + tb.getTable()); + LOG.info("Skipping table: {} as it belongs to database which is being failed over." + tb.getTable()); return true; } cmd = req.buildCommand(); LOG.debug("Running {} based on {}", cmd, req); - if (doWait) { - SessionState.start(ss); // This is the first call, open the session - } DriverUtils.runOnDriver(conf, user, ss, cmd); } catch (Exception e) { LOG.error("Analyze command failed: " + cmd, e); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index 61206d8f58ae..a68387bde8c0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -227,7 +227,6 @@ public void testTxnTable() throws Exception { msClient.close(); } - @Test public void testTxnPartitions() throws Exception { StatsUpdaterThread su = createUpdater(); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index ad7be17b1277..4dd95f188464 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -234,10 +234,7 @@ public static boolean isExternalTable(Table table) { public static boolean isDbBeingFailedOver(Database db) { assert (db != null); Map dbParameters = db.getParameters(); - if ((dbParameters != null) && (dbParameters.containsKey(ReplConst.REPL_FAILOVER_ENABLED))) { - return ReplConst.TRUE.equals(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); - } - return false; + return dbParameters != null && ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 729a9d33fa48..9d476824fd25 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -142,10 +142,19 @@ public void run() { String dbName = MetaStoreUtils.prependCatalogToDbName(tableMeta.getCatName(), tableMeta.getDbName(), conf); if (!databasesToSkip.containsKey(dbName)) { Database db = msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName()); - databasesToSkip.put(dbName, isTargetOfReplication(db) || MetaStoreUtils.isDbBeingFailedOver(db)); + boolean skipDb = false; + if (MetaStoreUtils.isDbBeingFailedOver(db)) { + skipDb = true; + LOG.info("Skipping all the tables which belong to database: {} as it is being failed over", + tableMeta.getDbName()); + } else if (isTargetOfReplication(db)) { + skipDb = true; + LOG.info("Skipping all the tables which belong to replicated database: {}", tableMeta.getDbName()); + } + databasesToSkip.put(dbName, skipDb); } if (databasesToSkip.get(dbName)) { - LOG.info("Skipping table : {}", tableMeta.getTableName()); + LOG.debug("Skipping table : {}", tableMeta.getTableName()); continue; } Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName()); @@ -248,7 +257,7 @@ private void setupMsckPathInvalidation() { public void run() { try { if (MetaStoreUtils.isDbBeingFailedOver((msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName())))) { - LOG.info("Skipping table: {} " + msckInfo.getTableName()); + LOG.info("Skipping table: {} as it belongs to database being failed over." + msckInfo.getTableName()); return; } Msck msck = new Msck( true, true); From 35fbb36a3276dd01b1b5aa20bd54f61f58e69ac4 Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Fri, 4 Jun 2021 07:13:31 +0530 Subject: [PATCH 6/7] Function moved to MetastoreUtils --- ...stReplicationScenariosAcrossInstances.java | 7 +++--- .../hive/ql/exec/repl/ReplLoadTask.java | 3 ++- .../hive/ql/exec/repl/util/ReplUtils.java | 9 ------- .../ql/parse/ReplicationSemanticAnalyzer.java | 5 ++-- .../hive/ql/stats/StatsUpdaterThread.java | 13 +--------- .../hive/metastore/utils/MetaStoreUtils.java | 18 +++++++++++++ .../metastore/PartitionManagementTask.java | 25 ++----------------- 7 files changed, 29 insertions(+), 51 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 432de7c52cf1..034bf2121898 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.parse.WarehouseInstance.Tuple; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; @@ -1167,12 +1168,12 @@ public void testIfReplTargetSetInIncremental() throws Throwable { //Perform empty dump and load primary.dump(primaryDbName); replica.load(replicatedDbName, primaryDbName); - assertTrue(ReplUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); + assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); replica.dumpFailure(replicatedDbName); //can not dump db which is target of replication replica.run("ALTER DATABASE " + replicatedDbName + " Set DBPROPERTIES('repl.target.for' = '')"); - assertFalse(ReplUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); + assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); replica.dump(replicatedDbName); // do a empty incremental load to allow dump of replicatedDbName @@ -1181,7 +1182,7 @@ public void testIfReplTargetSetInIncremental() throws Throwable { replica.load(replicatedDbName, primaryDbName); compareDbProperties(primary.getDatabase(primaryDbName).getParameters(), replica.getDatabase(replicatedDbName).getParameters()); - assertTrue(ReplUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); + assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName))); replica.dumpFailure(replicatedDbName); //Cannot dump database which is target of replication. } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a070930e1dcc..b30b65aadbd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; import org.apache.thrift.TException; @@ -631,7 +632,7 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception { if (work.replScopeModified) { dropTablesExcludedInReplScope(work.currentReplScope); } - if (!ReplUtils.isTargetOfReplication(getHive().getDatabase(work.dbNameToLoadIn))) { + if (!MetaStoreUtils.isTargetOfReplication(getHive().getDatabase(work.dbNameToLoadIn))) { Map props = new HashMap<>(); props.put(ReplConst.TARGET_OF_REPLICATION, "true"); AlterDatabaseSetPropertiesDesc setTargetDesc = new AlterDatabaseSetPropertiesDesc(work.dbNameToLoadIn, props, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 6ea21813341c..a0ee320414cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -259,15 +259,6 @@ public static boolean replCkptStatus(String dbName, Map props, S return false; } - public static boolean isTargetOfReplication(Database db) { - assert (db != null); - Map m = db.getParameters(); - if ((m != null) && (m.containsKey(ReplConst.TARGET_OF_REPLICATION))) { - return !StringUtils.isEmpty(m.get(ReplConst.TARGET_OF_REPLICATION)); - } - return false; - } - public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) throws SemanticException { String val = hiveConf.get(configParam); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 5562f5a5491c..da1fa0b64a3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.IOException; -import java.net.URI; import java.util.Map; import java.util.List; import java.util.Collections; @@ -177,7 +176,7 @@ private void initReplDump(ASTNode ast) throws HiveException { for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { Database database = db.getDatabase(dbName); if (database != null) { - if (ReplUtils.isTargetOfReplication(database)) { + if (MetaStoreUtils.isTargetOfReplication(database)) { LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a target of replication (repl.target.for)"); throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 34680181bd08..f30cd8fd920a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode; @@ -59,7 +58,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; @@ -219,16 +217,7 @@ private void stopWorkers() { String cat = fullTableName.getCat(), db = fullTableName.getDb(), tbl = fullTableName.getTable(); String dbName = MetaStoreUtils.prependCatalogToDbName(cat,db, conf); if (!dbsToSkip.containsKey(dbName)) { - Database database = rs.getDatabase(cat, db); - boolean skipDb = false; - if (MetaStoreUtils.isDbBeingFailedOver(database)) { - skipDb = true; - LOG.info("Skipping all the tables which belong to database: {} as it is being failed over", db); - } else if (ReplUtils.isTargetOfReplication(database)) { - skipDb = true; - LOG.info("Skipping all the tables which belong to replicated database: {}", db); - } - dbsToSkip.put(dbName, skipDb); + dbsToSkip.put(dbName, MetaStoreUtils.checkIfDbNeedsToBeSkipped(rs.getDatabase(cat, db))); } if (dbsToSkip.get(dbName)) { LOG.debug("Skipping table {}", tbl); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 4dd95f188464..dde731e0ab37 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -237,6 +237,24 @@ public static boolean isDbBeingFailedOver(Database db) { return dbParameters != null && ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); } + public static boolean isTargetOfReplication(Database db) { + assert (db != null); + Map dbParameters = db.getParameters(); + return dbParameters != null && !StringUtils.isEmpty(dbParameters.get(ReplConst.TARGET_OF_REPLICATION)); + } + + public static boolean checkIfDbNeedsToBeSkipped(Database db) { + assert (db != null); + if (isDbBeingFailedOver(db)) { + LOG.info("Skipping all the tables which belong to database: {} as it is being failed over", db.getName()); + return true; + } else if (isTargetOfReplication(db)) { + LOG.info("Skipping all the tables which belong to replicated database: {}", db.getName()); + return true; + } + return false; + } + /** * Determines whether an table needs to be purged or not. * diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 9d476824fd25..c63fba4e8fa5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -32,14 +32,11 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.repl.ReplConst; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.TimeValidator; -import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,15 +89,6 @@ private static boolean partitionDiscoveryEnabled(Map params) { params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true"); } - public boolean isTargetOfReplication(Database db) { - assert (db != null); - Map params = db.getParameters(); - if ((params != null) && (params.containsKey(ReplConst.TARGET_OF_REPLICATION))) { - return !StringUtils.isEmpty(params.get(ReplConst.TARGET_OF_REPLICATION)); - } - return false; - } - @Override public void run() { if (lock.tryLock()) { @@ -141,17 +129,8 @@ public void run() { try { String dbName = MetaStoreUtils.prependCatalogToDbName(tableMeta.getCatName(), tableMeta.getDbName(), conf); if (!databasesToSkip.containsKey(dbName)) { - Database db = msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName()); - boolean skipDb = false; - if (MetaStoreUtils.isDbBeingFailedOver(db)) { - skipDb = true; - LOG.info("Skipping all the tables which belong to database: {} as it is being failed over", - tableMeta.getDbName()); - } else if (isTargetOfReplication(db)) { - skipDb = true; - LOG.info("Skipping all the tables which belong to replicated database: {}", tableMeta.getDbName()); - } - databasesToSkip.put(dbName, skipDb); + databasesToSkip.put(dbName, MetaStoreUtils.checkIfDbNeedsToBeSkipped( + msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName()))); } if (databasesToSkip.get(dbName)) { LOG.debug("Skipping table : {}", tableMeta.getTableName()); From f99be969672367a2e6484b2c1c154ac9d5fdb693 Mon Sep 17 00:00:00 2001 From: hmangla98 Date: Tue, 8 Jun 2021 08:53:46 +0530 Subject: [PATCH 7/7] Review Comments --- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 1 - .../hive/ql/stats/TestStatsUpdaterThread.java | 13 ++++++++++--- .../hive/metastore/PartitionManagementTask.java | 12 +++++++----- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index a0ee320414cd..237988c41e60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.utils.StringUtils; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index a68387bde8c0..30f134103ad5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -623,6 +623,7 @@ private void testNoStatsUpdateForFailoverDb(String tblNamePrefix, String txnProp hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 4); String tblWOStats = tblNamePrefix + "_repl_failover_nostats"; String ptnTblWOStats = tblNamePrefix + "_ptn_repl_failover_nostats"; + String newTable = "new_table"; String dbName = ss.getCurrentDatabase(); StatsUpdaterThread su = createUpdater(); IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); @@ -649,8 +650,8 @@ private void testNoStatsUpdateForFailoverDb(String tblNamePrefix, String txnProp verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); Assert.assertEquals(0, su.getQueueLength()); - executeQuery("create table new_table(s string) partitioned by (i int) " + txnProperty); - executeQuery("insert into new_table(i, s) values (4, 'test4')"); + executeQuery("create table " + newTable + "(i int, s string) " + txnProperty); + executeQuery("insert into "+ newTable + "(i, s) values (4, 'test4')"); assertFalse(su.runOneIteration()); Assert.assertEquals(0, su.getQueueLength()); @@ -658,9 +659,15 @@ private void testNoStatsUpdateForFailoverDb(String tblNamePrefix, String txnProp verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); executeQuery("alter database " + dbName + " set dbproperties('" + ReplConst.REPL_FAILOVER_ENABLED + "'='')"); + assertTrue(su.runOneIteration()); + Assert.assertEquals(3, su.getQueueLength()); + drainWorkQueue(su, 3); + verifyStatsUpToDate(newTable, Lists.newArrayList("i"), msClient, true); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, true); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, true); executeQuery("drop table " + tblWOStats); executeQuery("drop table " + ptnTblWOStats); - executeQuery("drop table new_table"); + executeQuery("drop table " + newTable); msClient.close(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index c63fba4e8fa5..bd87483a6c7e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -168,7 +168,7 @@ public void run() { // this always runs in 'sync' mode where partitions can be added and dropped MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(), null, null, true, true, true, retentionSeconds); - executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch, msc)); + executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch)); } countDownLatch.await(); executorService.shutdownNow(); @@ -221,20 +221,19 @@ private void setupMsckPathInvalidation() { private Configuration conf; private String qualifiedTableName; private CountDownLatch countDownLatch; - IMetaStoreClient msc; - MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, - CountDownLatch countDownLatch, IMetaStoreClient msc) { + MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) { this.msckInfo = msckInfo; this.conf = conf; this.qualifiedTableName = qualifiedTableName; this.countDownLatch = countDownLatch; - this.msc = msc; } @Override public void run() { + IMetaStoreClient msc = null; try { + msc = new HiveMetaStoreClient(conf); if (MetaStoreUtils.isDbBeingFailedOver((msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName())))) { LOG.info("Skipping table: {} as it belongs to database being failed over." + msckInfo.getTableName()); return; @@ -247,6 +246,9 @@ public void run() { } finally { // there is no recovery from exception, so we always count down and retry in next attempt countDownLatch.countDown(); + if (msc != null) { + msc.close(); + } } } }