diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3639ab1f61..69d2648232 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -610,6 +610,10 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { .run("show tables") .verifyResults(new String[] { "t1" }); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters())); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters())); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters())); + replica.load("", incrementalTuple.dumpLocation) .run("show databases") .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) @@ -620,6 +624,11 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { .run("show tables") .verifyResults(new String[] { "t1", "t2" }); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters())); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters())); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters())); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbTwo).getParameters())); + /* Start of cleanup */ @@ -1012,7 +1021,7 @@ public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { .dump(primaryDbName, null); // Bootstrap Repl A -> B - WarehouseInstance.Tuple tupleReplica = replica.load(replicatedDbName, tuplePrimary.dumpLocation) + replica.load(replicatedDbName, tuplePrimary.dumpLocation) .run("repl status " + replicatedDbName) .verifyResult(tuplePrimary.lastReplicationId) .run("show tblproperties t1('custom.property')") @@ -1020,9 +1029,14 @@ public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { .dumpFailure(replicatedDbName, null) .run("alter database " + replicatedDbName + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')") - .dump(replicatedDbName, null); + .dumpFailure(replicatedDbName, null);//can not dump the db before first successful incremental load is done. + + // do a empty incremental load to allow dump of replicatedDbName + WarehouseInstance.Tuple temp = primary.dump(primaryDbName, tuplePrimary.lastReplicationId); + replica.load(replicatedDbName, temp.dumpLocation); // first successful incremental load. // Bootstrap Repl B -> C + WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName, null); String replDbFromReplica = replicatedDbName + "_dupe"; replica.load(replDbFromReplica, tupleReplica.dumpLocation) .run("use " + replDbFromReplica) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java new file mode 100644 index 0000000000..472c73a83d --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.*; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; +import static org.junit.Assert.*; + +/** + * TestReplicationWithTableMigrationEx - test replication for Hive2 to Hive3 (Strict managed tables) + */ +public class TestReplicationWithTableMigrationEx { + @Rule + public final TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigrationEx.class); + private static WarehouseInstance primary, replica; + private String primaryDbName, replicatedDbName; + + @BeforeClass + public static void classLevelSetup() throws Exception { + HashMap overrideProperties = new HashMap<>(); + internalBeforeClassSetup(overrideProperties); + } + + static void internalBeforeClassSetup(Map overrideConfigs) throws Exception { + HiveConf conf = new HiveConf(TestReplicationWithTableMigrationEx.class); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + HashMap hiveConfigs = new HashMap() {{ + put("fs.defaultFS", fs.getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); + put("hive.metastore.transactional.event.listeners", ""); + }}; + replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); + + HashMap configsForPrimary = new HashMap() {{ + put("fs.defaultFS", fs.getUri().toString()); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + }}; + configsForPrimary.putAll(overrideConfigs); + primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + + private void prepareData(String primaryDbName) throws Throwable { + primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " + + "into 3 buckets stored as orc ") + .run("insert into tacid values(1)") + .run("insert into tacid values(2)") + .run("insert into tacid values(3)") + .run("alter table tacidpart add partition(country='france')") + .run("insert into tacidpart partition(country='india') values('mumbai')") + .run("insert into tacidpart partition(country='us') values('sf')") + .run("insert into tacidpart partition(country='france') values('paris')"); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart"))); + } + + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"tacid", "tacidpart"}) + .run("repl status " + replicatedDbName) + .verifyResult(lastReplId) + .run("select count(*) from tacid") + .verifyResult("3") + .run("select id from tacid order by id") + .verifyResults(new String[]{"1", "2", "3"}) + .run("select count(*) from tacidpart") + .verifyResult("3") + .run("select country from tacidpart order by country") + .verifyResults(new String[] {"france", "india", "us"}); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart"))); + } + + private WarehouseInstance.Tuple dumpWithLastEventIdHacked(int eventId) throws Throwable { + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Override + public CurrentNotificationEventId apply(CurrentNotificationEventId id) { + try { + LOG.warn("GetCurrentNotificationEventIdBehaviour called"); + injectionPathCalled = true; + // keep events to reply during incremental + id.setEventId(eventId); + return id; + } catch (Throwable throwable) { + throwable.printStackTrace(); + return null; + } + } + }; + + InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerVerifier); + try { + return primary.dump(primaryDbName, null); + } finally { + InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour(); + callerVerifier.assertInjectionsPerformed(true, false); + } + } + + @Test + public void testConcurrentOpDuringBootStrapDumpCreateTableReplay() throws Throwable { + prepareData(primaryDbName); + + // dump with operation after last repl id is fetched. + WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + + // next incremental dump + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + } + + @Test + public void testConcurrentOpDuringBootStrapDumpInsertReplay() throws Throwable { + prepareData(primaryDbName); + + // dump with operation after last repl id is fetched. + WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + + // next incremental dump + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + } + + @Test + public void testTableLevelDumpMigration() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (i int, j int)") + .dump(primaryDbName+".t1", null); + replica.run("create database " + replicatedDbName); + replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + assertTrue(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters())); + + tuple = primary.run("use " + primaryDbName) + .run("insert into t1 values (1, 2)") + .dump(primaryDbName+".t1", tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + assertFalse(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters())); + } + + @Test + public void testConcurrentOpDuringBootStrapDumpInsertOverwrite() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("insert into tacid values(1)") + .run("insert into tacid values(2)") + .run("insert into tacid values(3)") + .run("insert overwrite table tacid values(4)") + .run("insert into tacid values(5)") + .run("insert into tacid values(6)") + .run("insert into tacid values(7)"); + + // dump with operation after last repl id is fetched. + WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"tacid"}) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select count(*) from tacid") + .verifyResult("4") + .run("select id from tacid order by id") + .verifyResults(new String[]{"4", "5", "6", "7"}); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + + // next incremental dump + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"tacid"}) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select count(*) from tacid") + .verifyResult("4") + .run("select id from tacid order by id") + .verifyResults(new String[]{"4", "5", "6", "7"}); + assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index c0d416cc21..56eae91546 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -121,9 +121,7 @@ private void initialize(String cmRoot, String externalTableWarehouseRoot, String warehouseRoot, Map overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); - for (Map.Entry entry : overridesForHiveConf.entrySet()) { - hiveConf.set(entry.getKey(), entry.getValue()); - } + String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); if (metaStoreUri != null) { hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); @@ -153,6 +151,10 @@ private void initialize(String cmRoot, String externalTableWarehouseRoot, String System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + for (Map.Entry entry : overridesForHiveConf.entrySet()) { + hiveConf.set(entry.getKey(), entry.getValue()); + } + MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true); // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index dc7b2877bf..61be5a3a5b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1536,6 +1536,62 @@ public void testCompactionInfoHashCode() { Assert.assertEquals("The hash codes must be equal", compactionInfo.hashCode(), compactionInfo1.hashCode()); } + @Test + public void testDisableCompactionDuringReplLoad() throws Exception { + String tblName = "discomp"; + String database = "discomp_db"; + executeStatementOnDriver("drop database if exists " + database + " cascade", driver); + executeStatementOnDriver("create database " + database, driver); + executeStatementOnDriver("CREATE TABLE " + database + "." + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + database + "." + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + executeStatementOnDriver("ALTER TABLE " + database + "." + tblName + + " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'true')", driver); + List compacts = getCompactionList(); + Assert.assertEquals(0, compacts.size()); + + executeStatementOnDriver("alter database " + database + + " set dbproperties ('hive.repl.first.inc.pending' = 'true')", driver); + executeStatementOnDriver("ALTER TABLE " + database + "." + tblName + + " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'false')", driver); + compacts = getCompactionList(); + Assert.assertEquals(0, compacts.size()); + + executeStatementOnDriver("alter database " + database + + " set dbproperties ('hive.repl.first.inc.pending' = 'false')", driver); + executeStatementOnDriver("ALTER TABLE " + database + "." + tblName + + " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'false')", driver); + compacts = getCompactionList(); + Assert.assertEquals(2, compacts.size()); + List partNames = new ArrayList(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals(database, compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + Assert.assertEquals("ds=today", partNames.get(1)); + Assert.assertEquals("ds=yesterday", partNames.get(0)); + executeStatementOnDriver("drop database if exists " + database + " cascade", driver); + + // Finish the scheduled compaction for ttp2 + runWorker(conf); + runCleaner(conf); + } + + private List getCompactionList() throws Exception { + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + runInitiator(conf); + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + return rsp.getCompacts(); + } + private void writeBatch(org.apache.hive.hcatalog.streaming.StreamingConnection connection, DelimitedInputWriter writer, boolean closeEarly) throws InterruptedException, org.apache.hive.hcatalog.streaming.StreamingException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 1ab4d62da2..8fa67d5f2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -225,6 +225,7 @@ import org.apache.hadoop.hive.ql.plan.PrivilegeObjectDesc; import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc; import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; +import org.apache.hadoop.hive.ql.plan.ReplSetFirstIncLoadFlagDesc; import org.apache.hadoop.hive.ql.plan.RevokeDesc; import org.apache.hadoop.hive.ql.plan.RoleDDLDesc; import org.apache.hadoop.hive.ql.plan.ShowColumnsDesc; @@ -286,6 +287,7 @@ import org.apache.hive.common.util.AnnotationUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; @@ -661,6 +663,10 @@ public int execute(DriverContext driverContext) { if (work.getAlterMaterializedViewDesc() != null) { return alterMaterializedView(db, work.getAlterMaterializedViewDesc()); } + + if (work.getReplSetFirstIncLoadFlagDesc() != null) { + return updateFirstIncPendingFlag(db, work.getReplSetFirstIncLoadFlagDesc()); + } } catch (Throwable e) { failed(e); return 1; @@ -5199,6 +5205,35 @@ public static boolean doesTableNeedLocation(Table tbl) { return retval; } + private int updateFirstIncPendingFlag(Hive hive, ReplSetFirstIncLoadFlagDesc desc) throws HiveException, TException { + String dbNameOrPattern = desc.getDatabaseName(); + String tableNameOrPattern = desc.getTableName(); + String flag = desc.getIncLoadPendingFlag() ? "true" : "false"; + Map parameters; + // For database level load tableNameOrPattern will be null. Flag is set only in database for db level load. + if (tableNameOrPattern != null && !tableNameOrPattern.isEmpty()) { + // For table level load, dbNameOrPattern is db name and not a pattern. + for (String tableName : Utils.matchesTbl(hive, dbNameOrPattern, tableNameOrPattern)) { + org.apache.hadoop.hive.metastore.api.Table tbl = hive.getMSC().getTable(dbNameOrPattern, tableName); + parameters = tbl.getParameters(); + if (ReplUtils.isFirstIncPending(parameters)) { + parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, flag); + hive.getMSC().alter_table(dbNameOrPattern, tableName, tbl); + } + } + } else { + for (String dbName : Utils.matchesDb(hive, dbNameOrPattern)) { + Database database = hive.getMSC().getDatabase(dbName); + parameters = database.getParameters(); + if (ReplUtils.isFirstIncPending(parameters)) { + parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, flag); + hive.getMSC().alterDatabase(dbName, database); + } + } + } + return 0; + } + /* uses the authorizer from SessionState will need some more work to get this to run in parallel, however this should not be a bottle neck so might not need to parallelize this. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 179f2917c3..a603e308b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -35,6 +35,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.ListIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,21 @@ public ReplCopyTask(){ super(); } + // If file is already present in base directory, then remove it from the list. + // Check HIVE-21197 for more detail + private void updateSrcFileListForDupCopy(FileSystem dstFs, Path toPath, List srcFiles, + long writeId, int stmtId) throws IOException { + ListIterator iter = srcFiles.listIterator(); + Path basePath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(true, writeId, writeId, stmtId)); + while (iter.hasNext()) { + Path filePath = new Path(basePath, iter.next().getSourcePath().getName()); + if (dstFs.exists(filePath)) { + LOG.debug("File " + filePath + " is already present in base directory. So removing it from the list."); + iter.remove(); + } + } + } + @Override protected int execute(DriverContext driverContext) { LOG.debug("ReplCopyTask.execute()"); @@ -120,6 +136,14 @@ protected int execute(DriverContext driverContext) { } if (work.isCopyToMigratedTxnTable()) { + if (work.isNeedCheckDuplicateCopy()) { + updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); + if (srcFiles.isEmpty()) { + LOG.info("All files are already present in the base directory. Skipping copy task."); + return 0; + } + } // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. // The toPath received in ReplCopyWork is pointing to table/partition base location. @@ -133,8 +157,12 @@ protected int execute(DriverContext driverContext) { return 6; } long writeId = Long.parseLong(writeIdString); - toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, - driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement())); + // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any + // duplicate copy from the source. Check HIVE-21197 for more detail. + int stmtId = (writeId == ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID) ? + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : + driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement(); + toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); } } else { // This flow is usually taken for IMPORT command @@ -271,12 +299,16 @@ public String getName() { LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); - if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); rcwork.setNeedRecycle(needRecycle); } rcwork.setCopyToMigratedTxnTable(copyToMigratedTxnTable); + // For replace case, duplicate check should not be done. The new base directory will automatically make the older + // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple + // replace events getting replayed in the first incremental load. + rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace()); LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index acfa354356..31185fc6e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -255,9 +255,18 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); + LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern); + String validTxnList = getValidTxnListForReplDump(hiveDb); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + Database db = hiveDb.getDatabase(dbName); + if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) { + // For replicated (target) database, until after first successful incremental load, the database will not be + // in a consistent state. Avoid allowing replicating this database to a new target. + throw new HiveException("Replication dump not allowed for replicated database" + + " with first incremental dump pending : " + dbName); + } replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), Utils.getAllTables(hiveDb, dbName).size(), hiveDb.getAllFunctions().size()); @@ -274,6 +283,19 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); + Table table; + try { + table = hiveDb.getTable(dbName, tblName); + } catch (InvalidTableException e) { + LOG.info("table has been dropped concurrently "+ tblName); + continue; + } + if (table != null && ReplUtils.isFirstIncPending(table.getParameters())) { + // For replicated (target) table, until after first successful incremental load, the table will not be + // in a consistent state. Avoid allowing replicating this table to a new target. + throw new HiveException("Replication dump not allowed for replicated table" + + " with first incremental dump pending : " + tblName); + } try { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 4dc14f47c9..7062eda98d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -129,7 +129,7 @@ a database ( directory ) case Database: DatabaseEvent dbEvent = (DatabaseEvent) next; dbTracker = - new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker) + new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, work.tableNameToLoadIn, loadTaskTracker) .tasks(); loadTaskTracker.update(dbTracker); if (work.hasDbState()) { @@ -370,6 +370,9 @@ private int executeIncrementalLoad(DriverContext driverContext) { // If incremental events are already applied, then check and perform if need to bootstrap any tables. if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + // No need to set incremental load pending flag for external tables as the files will be copied to the same path + // for external table unlike migrated txn tables. Currently bootstrap during incremental is done only for + // external tables. if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 0fd305a0f9..a1b246c4c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -48,13 +48,15 @@ private final DatabaseEvent event; private final String dbNameToLoadIn; + private final boolean isTableLevelLoad; - public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, + public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, String tblNameToLoadIn, TaskTracker loadTaskTracker) { this.context = context; this.event = event; this.dbNameToLoadIn = dbNameToLoadIn; this.tracker = new TaskTracker(loadTaskTracker); + isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty(); } public TaskTracker tasks() throws SemanticException { @@ -123,7 +125,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbObj.getName()); createDbDesc.setComment(dbObj.getDescription()); - createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory)); + createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory, true)); // note that we do not set location - for repl load, we want that auto-created. createDbDesc.setIfNotExists(false); @@ -135,7 +137,8 @@ private boolean isDbEmpty(String dbName) throws HiveException { } private Task alterDbTask(Database dbObj) { - return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory), context.hiveConf); + return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory, !isTableLevelLoad), + context.hiveConf); } private Task setOwnerInfoTask(Database dbObj) { @@ -146,7 +149,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { return TaskFactory.get(work, context.hiveConf); } - private static Map updateDbProps(Database dbObj, String dumpDirectory) { + private static Map updateDbProps(Database dbObj, String dumpDirectory, boolean needSetIncFlag) { /* explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going to run multiple times and explicit logic is in place which prevents updates to tables when db level @@ -158,6 +161,15 @@ private boolean isDbEmpty(String dbName) throws HiveException { // Add the checkpoint key to the Database binding it to current dump directory. // So, if retry using same dump, we shall skip Database object update. parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory); + + if (needSetIncFlag) { + // This flag will be set to false after first incremental load is done. This flag is used by repl copy task to + // check if duplicate file check is required or not. This flag is used by compaction to check if compaction can be + // done for this database or not. If compaction is done before first incremental then duplicate check will fail as + // compaction may change the directory structure. + parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); + } + return parameters; } @@ -173,7 +185,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, TaskTracker loadTaskTracker) { - super(context, event, dbNameToLoadIn, loadTaskTracker); + super(context, event, dbNameToLoadIn, null, loadTaskTracker); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java index 8e01fb1e6b..dbda41d827 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; public class TableContext { final String dbNameToLoadIn; @@ -43,6 +44,14 @@ ImportTableDesc overrideProperties(ImportTableDesc importTableDesc) throws SemanticException { if (StringUtils.isNotBlank(tableNameToLoadIn)) { importTableDesc.setTableName(tableNameToLoadIn); + + //For table level load, add this property to avoid duplicate copy. + // This flag will be set to false after first incremental load is done. This flag is used by + // repl copy task to check if duplicate file check is required or not. This flag is used by + // compaction to check if compaction can be done for this database or not. If compaction is + // done before first incremental then duplicate check will fail as compaction may change + // the directory structure. + importTableDesc.getTblProps().put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); } return importTableDesc; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 3e0c969d4d..601de8abc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; +import org.apache.hadoop.hive.ql.plan.ReplSetFirstIncLoadFlagDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.slf4j.Logger; @@ -164,6 +165,12 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP lastEventid); } } + + ReplSetFirstIncLoadFlagDesc desc = new ReplSetFirstIncLoadFlagDesc(dbName, tableName, false); + Task updateIncPendTask = TaskFactory.get(new DDLWork(inputs, outputs, desc), conf); + taskChainTail.addDependentTask(updateIncPendTask); + taskChainTail = updateIncPendTask; + Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 91eeb13c84..b7ccddcdea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -60,6 +60,7 @@ public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id"; public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + public static final String REPL_FIRST_INC_PENDING_FLAG = "hive.repl.first.inc.pending"; // write id allocated in the current execution context which will be passed through config to be used by different // tasks. @@ -75,6 +76,10 @@ // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L; + // we keep the statement id as 0 so that the base directory is created with 0 and is easy to find out during + // duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained. + public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0; + /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -187,4 +192,14 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { } }; } + + public static boolean isFirstIncPending(Map parameter) { + if (parameter == null) { + return false; + } + String firstIncPendFlag = parameter.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG); + // If flag is not set, then we assume first incremental load is done as the database/table may be created by user + // and not through replication. + return firstIncPendFlag != null && !firstIncPendFlag.isEmpty() && "true".equalsIgnoreCase(firstIncPendFlag); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index d4fb1917b0..6584cb505d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -71,6 +71,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import java.io.IOException; import java.io.Serializable; @@ -472,10 +473,10 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadFileType lft; boolean isAutoPurge = false; boolean needRecycle = false; - boolean copyToMigratedTxnTable = false; + boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); - if (replicationSpec.isInReplicationScope() && - x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable || + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { lft = LoadFileType.IGNORE; destPath = loadPath = tgtPath; isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); @@ -485,7 +486,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); } - copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); } else { if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { String mmSubdir = replace ? AcidUtils.baseDir(writeId) @@ -531,8 +531,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false); - if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table) && - !replicationSpec.isMigratingToTxnTable()) { + if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( Collections.singletonList(destPath), Collections.singletonList(tgtPath), @@ -603,7 +602,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); boolean isAutoPurge = false; boolean needRecycle = false; - boolean copyToMigratedTxnTable = false; + boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec) || (tblDesc.isExternal() && tblDesc.getLocation() == null)) { @@ -624,8 +623,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadFileType loadFileType; Path destPath; - if (replicationSpec.isInReplicationScope() && - x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable || + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { loadFileType = LoadFileType.IGNORE; destPath = tgtLocation; isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); @@ -635,7 +634,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); } - copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); } else { loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : @@ -675,8 +673,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import? // See setLoadFileType and setIsAcidIow calls elsewhere for an example. - if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps()) && - !replicationSpec.isMigratingToTxnTable()) { + if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( Collections.singletonList(destPath), Collections.singletonList(tgtLocation), @@ -1136,6 +1133,7 @@ private static void createReplImportTasks( Task dropTblTask = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; + boolean firstIncPending; // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying @@ -1147,6 +1145,12 @@ private static void createReplImportTasks( if (!waitOnPrecursor){ throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); } + // For warehouse level replication, if the database itself is getting created in this load, then no need to + // check for duplicate copy. Check HIVE-21197 for more detail. + firstIncPending = false; + } else { + // For database replication, get the flag from database parameter. Check HIVE-21197 for more detail. + firstIncPending = ReplUtils.isFirstIncPending(parentDb.getParameters()); } if (table != null) { @@ -1164,6 +1168,9 @@ private static void createReplImportTasks( if (x.getEventType() == DumpType.EVENT_CREATE_TABLE) { dropTblTask = dropTableTask(table, x, replicationSpec); table = null; + } else if (!firstIncPending) { + // For table level replication, get the flag from table parameter. Check HIVE-21197 for more detail. + firstIncPending = ReplUtils.isFirstIncPending(table.getParameters()); } } else { // If table doesn't exist, allow creating a new one only if the database state is older than the update. @@ -1175,6 +1182,10 @@ private static void createReplImportTasks( } } + // For first incremental load just after bootstrap, we need to check for duplicate copy. + // Check HIVE-21197 for more detail. + replicationSpec.setNeedDupCopyCheck(firstIncPending); + if (updatedMetadata != null) { updatedMetadata.set(replicationSpec.getReplicationState(), tblDesc.getDatabaseName(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index d55ee208cc..055d454b21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -49,6 +49,7 @@ private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT private boolean isMigratingToTxnTable = false; private boolean isMigratingToExternalTable = false; + private boolean needDupCopyCheck = false; // Key definitions related to replication public enum KEY { @@ -426,4 +427,14 @@ public static void copyLastReplId(Map srcParameter, Map inputs, HashSet outputs, this.triggerToPoolMappingDesc = triggerToPoolMappingDesc; } + public DDLWork(HashSet inputs, HashSet outputs, + ReplSetFirstIncLoadFlagDesc replSetFirstIncLoadFlagDesc) { + this(inputs, outputs); + this.replSetFirstIncLoadFlagDesc = replSetFirstIncLoadFlagDesc; + } + /** * @return Create Database descriptor */ @@ -1356,4 +1364,12 @@ public CreateOrDropTriggerToPoolMappingDesc getTriggerToPoolMappingDesc() { public void setTriggerToPoolMappingDesc(CreateOrDropTriggerToPoolMappingDesc triggerToPoolMappingDesc) { this.triggerToPoolMappingDesc = triggerToPoolMappingDesc; } + + public ReplSetFirstIncLoadFlagDesc getReplSetFirstIncLoadFlagDesc() { + return replSetFirstIncLoadFlagDesc; + } + + public void setReplSetFirstIncLoadFlagDesc(ReplSetFirstIncLoadFlagDesc replSetFirstIncLoadFlagDesc) { + this.replSetFirstIncLoadFlagDesc = replSetFirstIncLoadFlagDesc; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index 4d34f8dbea..c631f3d6e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -59,6 +59,8 @@ private boolean copyToMigratedTxnTable; + private boolean checkDuplicateCopy = false; + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { super(srcPath, destPath, errorOnSrcEmpty); } @@ -110,4 +112,12 @@ public boolean isCopyToMigratedTxnTable() { public void setCopyToMigratedTxnTable(boolean copyToMigratedTxnTable) { this.copyToMigratedTxnTable = copyToMigratedTxnTable; } + + public boolean isNeedCheckDuplicateCopy() { + return checkDuplicateCopy; + } + + public void setCheckDuplicateCopy(boolean flag) { + checkDuplicateCopy = flag; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplSetFirstIncLoadFlagDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplSetFirstIncLoadFlagDesc.java new file mode 100644 index 0000000000..7ee8da9356 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplSetFirstIncLoadFlagDesc.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; + +/** + * ReplSetFirstIncLoadFlagDesc. + * + */ +@Explain(displayName = "Set First Incr Load Flag", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplSetFirstIncLoadFlagDesc extends DDLDesc implements Serializable { + + private static final long serialVersionUID = 1L; + String databaseName; + String tableName; + boolean incLoadPendingFlag; + + /** + * For serialization only. + */ + public ReplSetFirstIncLoadFlagDesc() { + } + + public ReplSetFirstIncLoadFlagDesc(String databaseName, String tableName, boolean incLoadPendingFlag) { + super(); + this.databaseName = databaseName; + this.tableName = tableName; + this.incLoadPendingFlag = incLoadPendingFlag; + } + + @Explain(displayName="db_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + @Explain(displayName="table_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + @Explain(displayName="inc load pending flag", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public boolean getIncLoadPendingFlag() { + return incLoadPendingFlag; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index f45140d0d7..94f0031662 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -32,8 +32,10 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -91,6 +93,8 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { */ abstract Table resolveTable(CompactionInfo ci) throws MetaException; + abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws TException; + /** * Get list of partitions by name. * @param ci compaction info. @@ -217,4 +221,9 @@ public static void initializeAndStartThread(CompactorThread thread, thread.init(new AtomicBoolean(), new AtomicBoolean()); thread.start(); } + + protected boolean replIsCompactionDisabledForTable(Table tbl) { + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + return ReplUtils.isFirstIncPending(tbl.getParameters()); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a37c983f3c..32c162c16c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -104,6 +104,12 @@ public void run() { } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { + if (replIsCompactionDisabledForDatabase(ci.dbname)) { + // Compaction is disabled for replicated database until after first successful incremental load. + LOG.info("Compaction is disabled for database " + ci.dbname); + continue; + } + Table t = resolveTable(ci); if (t == null) { // Most likely this means it's a temp table @@ -112,6 +118,12 @@ public void run() { continue; } + if (replIsCompactionDisabledForTable(t)) { + // Compaction is disabled for replicated table until after first successful incremental load. + LOG.info("Compaction is disabled for table " + ci.getFullTableName()); + continue; + } + // check if no compaction set for this table if (noAutoCompactSet(t)) { LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 1ddc54d68a..a6dd4fa003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -24,10 +24,13 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.thrift.TException; import java.util.Collections; import java.util.List; @@ -71,6 +74,17 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { } } + @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + try { + Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + return ReplUtils.isFirstIncPending(database.getParameters()); + } catch (NoSuchObjectException e) { + LOG.info("Unable to find database " + dbName); + return true; + } + } + @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { try { return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java index 9678786612..4235184fec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -19,10 +19,13 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.thrift.TException; import java.util.Collections; @@ -53,6 +56,17 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { } } + @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + try { + Database database = msc.getDatabase(getDefaultCatalog(conf), dbName); + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + return ReplUtils.isFirstIncPending(database.getParameters()); + } catch (NoSuchObjectException e) { + LOG.info("Unable to find database " + dbName); + return true; + } + } + @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { try { return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index 9daff370ef..6c7fe116cc 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import static org.junit.Assert.assertEquals; @@ -87,6 +88,9 @@ public CallerArguments(String dbName) { private static com.google.common.base.Function alterTableModifier = null; + private static com.google.common.base.Function + getCurrNotiEventIdModifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier; @@ -270,4 +274,25 @@ public boolean alterDatabase(String catalogName, String dbname, Database db) } return super.alterDatabase(catalogName, dbname, db); } + + // Methods to set/reset getCurrentNotificationEventId modifier + public static void setGetCurrentNotificationEventIdBehaviour( + com.google.common.base.Function modifier){ + getCurrNotiEventIdModifier = modifier; + } + public static void resetGetCurrentNotificationEventIdBehaviour(){ + setGetCurrentNotificationEventIdBehaviour(null); + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + CurrentNotificationEventId id = super.getCurrentNotificationEventId(); + if (getCurrNotiEventIdModifier != null) { + id = getCurrNotiEventIdModifier.apply(id); + if (id == null) { + throw new RuntimeException("InjectableBehaviourObjectStore: Invalid getCurrentNotificationEventId"); + } + } + return id; + } }