diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 742dddf6ee..61498b587a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -531,7 +531,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "task increment that would cross the specified limit."), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100, "Number of threads that will be used to dump partition data information during repl dump."), - REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false, + REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", true, "Indicates whether replication should run data copy tasks during repl load operation."), REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000, "This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n" diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java index ec19c5e1f6..f803bc210f 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java @@ -77,6 +77,7 @@ public static void setUpBeforeClass() throws Exception { TestHCatClient.startMetaStoreServer(); hconf = TestHCatClient.getConf(); hconf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,""); + hconf.set(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); hconf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index 58b3ab4dd6..a7cd3a64af 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -98,6 +98,7 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) put("hive.metastore.disallow.incompatible.col.type.changes", "false"); put("hive.in.repl.test", "true"); put("metastore.warehouse.tenant.colocation", "true"); + put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); }}; acidEnableConf.putAll(overrides); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java index 3f4cacfe58..24104c709b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java @@ -55,6 +55,7 @@ public static void classLevelSetup() throws Exception { conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); conf.set("hive.repl.include.external.tables", "false"); + conf.set(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); HashMap overridesForHiveConf = new HashMap() {{ diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java deleted file mode 100644 index 5e7bf7e885..0000000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse; - -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; -import org.junit.BeforeClass; - -import java.util.HashMap; -import java.util.Map; - -public class TestReplTableMigrationWithJsonFormat extends TestReplicationWithTableMigration { - @BeforeClass - public static void classLevelSetup() throws Exception { - Map overrides = new HashMap<>(); - overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), - JSONMessageEncoder.class.getCanonicalName()); - internalBeforeClassSetup(overrides); - } -} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b8e91dda2f..dda495dece 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -183,6 +183,7 @@ static void internalBeforeClassSetup(Map additionalProperties, b hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true); hconf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true); hconf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE, true); + hconf.setBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY, false); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index cd48d4bfdd..ff78cf449c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -103,6 +103,7 @@ static void internalBeforeClassSetup(Map overrides, put("hive.metastore.disallow.incompatible.col.type.changes", "false"); put("metastore.warehouse.tenant.colocation", "true"); put("hive.in.repl.test", "true"); + put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); }}; acidEnableConf.putAll(overrides); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index cc1701fe69..a5678f26e8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -77,6 +77,7 @@ public static void classLevelSetup() throws Exception { overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName()); + overrides.put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); internalBeforeClassSetup(overrides, TestReplicationScenarios.class); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java deleted file mode 100644 index 0645cef15e..0000000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ /dev/null @@ -1,577 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; -import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; - -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.BeforeClass; -import org.junit.AfterClass; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; -import org.junit.rules.TestName; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables) - */ -public class TestReplicationWithTableMigration { - private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; - private static String fullyQualifiedReplicaExternalBase; - - @Rule - public final TestName testName = new TestName(); - - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class); - private static WarehouseInstance primary, replica; - private String primaryDbName, replicatedDbName; - private Path avroSchemaFile = null; - - @BeforeClass - public static void classLevelSetup() throws Exception { - HashMap overrideProperties = new HashMap<>(); - overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), - GzipJSONMessageEncoder.class.getCanonicalName()); - internalBeforeClassSetup(overrideProperties); - } - - static void internalBeforeClassSetup(Map overrideConfigs) throws Exception { - HiveConf conf = new HiveConf(TestReplicationWithTableMigration.class); - conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); - HashMap hiveConfigs = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); - }}; - - HashMap configsForPrimary = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); - put("hive.stats.autogather", "true"); - put("hive.stats.column.autogather", "true"); - }}; - configsForPrimary.putAll(overrideConfigs); - primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); - hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); - replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); - fullyQualifiedReplicaExternalBase = miniDFSCluster.getFileSystem().getFileStatus( - new Path("/")).getPath().toString(); - } - - private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { - Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME); - String[] schemaVals = new String[] { "{", - " \"type\" : \"record\",", - " \"name\" : \"table1\",", - " \"doc\" : \"Sqoop import of table1\",", - " \"fields\" : [ {", - " \"name\" : \"col1\",", - " \"type\" : [ \"null\", \"string\" ],", - " \"default\" : null,", - " \"columnName\" : \"col1\",", - " \"sqlType\" : \"12\"", - " }, {", - " \"name\" : \"col2\",", - " \"type\" : [ \"null\", \"long\" ],", - " \"default\" : null,", - " \"columnName\" : \"col2\",", - " \"sqlType\" : \"13\"", - " } ],", - " \"tableName\" : \"table1\"", - "}" - }; - - try (FSDataOutputStream stream = fs.create(schemaFile)) { - for (String line : schemaVals) { - stream.write((line + "\n").getBytes()); - } - } - fs.deleteOnExit(schemaFile); - return schemaFile; - } - - @AfterClass - public static void classLevelTearDown() throws IOException { - primary.close(); - replica.close(); - } - - @Before - public void setup() throws Throwable { - primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); - replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + - SOURCE_OF_REPLICATION + "' = '1,2,3')"); - if (avroSchemaFile == null) { - Path testPath = new Path("/tmp/avro_schema/definition/" + System.nanoTime()); - DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); - fs.mkdirs(testPath, new FsPermission("777")); - avroSchemaFile = PathBuilder.fullyQualifiedHDFSUri(createAvroSchemaFile(fs, testPath), fs); - } - } - - @After - public void tearDown() throws Throwable { - primary.run("drop database if exists " + primaryDbName + " cascade"); - replica.run("drop database if exists " + replicatedDbName + " cascade"); - } - - private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into tacid values(1)") - .run("insert into tacid values(2)") - .run("insert into tacid values(3)") - .run( - "create table tacidpart (place string) partitioned by (country string) clustered by(place) " - + - "into 3 buckets stored as orc ") - .run("alter table tacidpart add partition(country='france')") - .run("insert into tacidpart partition(country='india') values('mumbai')") - .run("insert into tacidpart partition(country='us') values('sf')") - .run("insert into tacidpart partition(country='france') values('paris')") - .run( - "create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") - .run("insert into tflat values(11)") - .run("insert into tflat values(22)") - .run("create table tflattext (id int) ") - .run("insert into tflattext values(111), (222)") - .run("create table tflattextpart (id int) partitioned by (country string) ") - .run("insert into tflattextpart partition(country='india') values(1111), (2222)") - .run("insert into tflattextpart partition(country='us') values(3333)") - .run( - "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp/fol' ") - .run("insert into tacidloc values(1)") - .run("insert into tacidloc values(2)") - .run("insert into tacidloc values(3)") - .run( - "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " - + - "into 3 buckets stored as orc ") - .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/fol/part'") - .run("insert into tacidpartloc partition(country='india') values('mumbai')") - .run("insert into tacidpartloc partition(country='us') values('sf')") - .run("insert into tacidpartloc partition(country='france') values('paris')") - .run( - "create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " - + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri() - .toString() + "')") - .run("insert into avro_table values ('str1', 10)") - .run( - "create table avro_table_part partitioned by (country string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " - + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri() - .toString() + "')") - .run("insert into avro_table_part partition (country='india') values ('another', 13)") - .dump(primaryDbName); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattext"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc"))); - assertAvroTableState(primaryDbName, "avro_table", "avro_table_part"); - assertAvroTableState(primaryDbName, "avro_table_part"); - return tuple; - } - - private void assertAvroTableState(String primaryDbName, String... tableNames) throws Exception { - for (String tableName : tableNames) { - Table avroTable = primary.getTable(primaryDbName, tableName); - assertFalse(isTransactionalTable(avroTable)); - assertFalse(MetaStoreUtils.isExternalTable(avroTable)); - } - } - - private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { - replica.run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart", - "tacidloc", "tacidpartloc", "avro_table", "avro_table_part" }) - .run("repl status " + replicatedDbName) - .verifyResult(lastReplId) - .run("select id from tacid order by id") - .verifyResults(new String[]{"1", "2", "3"}) - .run("select country from tacidpart order by country") - .verifyResults(new String[] {"france", "india", "us"}) - .run("select rank from tflat order by rank") - .verifyResults(new String[] {"11", "22"}) - .run("select id from tflattext order by id") - .verifyResults(new String[] {"111", "222"}) - .run("select id from tflattextpart order by id") - .verifyResults(new String[] {"1111", "2222", "3333"}) - .run("select id from tacidloc order by id") - .verifyResults(new String[]{"1", "2", "3"}) - .run("select country from tacidpartloc order by country") - .verifyResults(new String[] {"france", "india", "us"}) - .run("select col1 from avro_table") - .verifyResults(new String[] { "str1" }) - .run("select col1 from avro_table_part") - .verifyResults(new String[] { "another" }); - - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart"))); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tflat"))); - assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattext"))); - assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattextpart"))); - assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattext"))); - assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart"))); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc"))); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc"))); - assertTablePath(replicatedDbName, "avro_table"); - assertPartitionPath(replicatedDbName, "avro_table_part"); - } - - private void assertPartitionPath(String replicatedDbName, String tableName) throws Exception { - Path tablePath = assertTablePath(replicatedDbName, tableName); - List partitions = replica.getAllPartitions(replicatedDbName, tableName); - assertEquals(1, partitions.size()); - String actualPartitionPath = partitions.iterator().next().getSd().getLocation().toLowerCase(); - String expectedPartitionPath = new PathBuilder(tablePath.toString()) - .addDescendant("country=india").build().toUri().toString().toLowerCase(); - assertEquals(expectedPartitionPath, actualPartitionPath); - } - - private Path assertTablePath(String replicatedDbName, String tableName) throws Exception { - Table avroTable = replica.getTable(replicatedDbName, tableName); - assertTrue(MetaStoreUtils.isExternalTable(avroTable)); - Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()) - .addDescendant(replicatedDbName + ".db").addDescendant(tableName).build(); - String expectedTablePath = tablePath.toUri().toString().toLowerCase(); - String actualTablePath = avroTable.getSd().getLocation().toLowerCase(); - assertEquals(expectedTablePath, actualTablePath); - return tablePath; - } - - private void loadWithFailureInAddNotification(String tbl) throws Throwable { - BehaviourInjection callerVerifier - = new BehaviourInjection() { - @Nullable - @Override - public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) { - injectionPathCalled = true; - if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { - LOG.warn("Verifier - DB: " + args.dbName - + " Constraint Table: " + args.constraintTblName); - return false; - } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + args.tblName); - return args.tblName.equalsIgnoreCase(tbl); - } - return true; - } - }; - InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); - try { - replica.loadFailure(replicatedDbName, primaryDbName); - } finally { - InjectableBehaviourObjectStore.resetCallerVerifier(); - } - callerVerifier.assertInjectionsPerformed(true, false); - } - - @Test - public void testBootstrapLoadMigrationManagedToAcid() throws Throwable { - WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); - replica.load(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void testIncrementalLoadMigrationManagedToAcid() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); - replica.load(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void testIncrementalLoadMigrationManagedToAcidFailure() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); - loadWithFailureInAddNotification("tacid"); - replica.run("use " + replicatedDbName) - .run("show tables like tacid") - .verifyResult(null); - replica.load(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void testIncrementalLoadMigrationManagedToAcidFailurePart() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); - loadWithFailureInAddNotification("tacidpart"); - replica.run("use " + replicatedDbName) - .run("show tables like tacidpart") - .verifyResult(null); - replica.load(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable { - WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName) - .run("REPL STATUS " + replicatedDbName) - .verifyResult(bootStrapDump.lastReplicationId); - List selectStmtList = new ArrayList<>(); - List expectedValues = new ArrayList<>(); - String tableName = testName.getMethodName() + "testInsert"; - String tableNameMM = tableName + "_MM"; - - ReplicationTestUtils.appendInsert(primary, primaryDbName, null, - tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendTruncate(primary, primaryDbName, - null, selectStmtList, expectedValues); - ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendImport(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendLoadLocal(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendInsertUnion(primary, primaryDbName, - null, tableName, tableNameMM, selectStmtList, expectedValues); - ReplicationTestUtils.appendAlterTable(primary, primaryDbName, - null, selectStmtList, expectedValues); - - ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName, - replicatedDbName, selectStmtList, expectedValues, bootStrapDump.lastReplicationId); - } - - @Test - public void testBootstrapConvertedExternalTableAutoPurgeDataOnDrop() throws Throwable { - WarehouseInstance.Tuple bootstrap = primary.run("use " + primaryDbName) - .run("create table avro_tbl partitioned by (country string) ROW FORMAT SERDE " - + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro " - + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')") - .run("insert into avro_tbl partition (country='india') values ('another', 13)") - .dump(primaryDbName); - - replica.load(replicatedDbName, primaryDbName); - Path dataLocation = assertTablePath(replicatedDbName, "avro_tbl"); - - WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName) - .run("drop table avro_tbl") - .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - - // After drop, the external table data location should be auto deleted as it is converted one. - assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation)); - } - - @Test - public void testIncConvertedExternalTableAutoDeleteDataDirOnDrop() throws Throwable { - WarehouseInstance.Tuple bootstrap = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - - primary.run("use " + primaryDbName) - .run("create table avro_tbl ROW FORMAT SERDE " - + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro " - + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')") - .run("insert into avro_tbl values ('str', 13)") - .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - - // Data location is valid and is under default external warehouse directory. - Table avroTable = replica.getTable(replicatedDbName, "avro_tbl"); - assertTrue(MetaStoreUtils.isExternalTable(avroTable)); - Path dataLocation = new Path(avroTable.getSd().getLocation()); - assertTrue(replica.miniDFSCluster.getFileSystem().exists(dataLocation)); - - primary.run("use " + primaryDbName) - .run("drop table avro_tbl") - .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - - // After drop, the external table data location should be auto deleted as it is converted one. - assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation)); - } - - @Test - public void testBootstrapLoadMigrationToAcidWithMoveOptimization() throws Throwable { - List withConfigs = - Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); - WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); - replica.load(replicatedDbName, primaryDbName, withConfigs); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void testIncrementalLoadMigrationToAcidWithMoveOptimization() throws Throwable { - List withConfigs = - Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName); - tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); - replica.load(replicatedDbName, primaryDbName, withConfigs); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - } - - @Test - public void dynamicallyConvertManagedToExternalTable() throws Throwable { - // With Strict managed disabled but Db enabled for replication, it is not possible to convert - // external table to managed table. - primary.run("use " + primaryDbName) - .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into t1 values(1)") - .run("create table t2 partitioned by (country string) ROW FORMAT SERDE " - + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro " - + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')") - .run("insert into t2 partition (country='india') values ('another', 13)") - .runFailure("alter table t1 set tblproperties('EXTERNAL'='true')") - .runFailure("alter table t2 set tblproperties('EXTERNAL'='true')"); - } - - @Test - public void dynamicallyConvertExternalToManagedTable() throws Throwable { - // With Strict managed disabled but Db enabled for replication, it is not possible to convert - // external table to managed table. - primary.run("use " + primaryDbName) - .run("create external table t1 (id int) stored as orc") - .run("insert into table t1 values (1)") - .run("create external table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .runFailure("alter table t1 set tblproperties('EXTERNAL'='false')") - .runFailure("alter table t2 set tblproperties('EXTERNAL'='false')"); - } - - @Test - public void testMigrationWithUpgrade() throws Throwable { - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into tacid values (3)") - .run("create table texternal (id int) ") - .run("insert into texternal values (1)") - .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName) - .run("use " + replicatedDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("select id from tacid") - .verifyResult("3") - .run("select id from texternal") - .verifyResult("1"); - - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); - assertFalse(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); - - // forcefully (setting db property) alter the table type. For acid table, set the bootstrap acid table to true. For - // external table, the alter event should alter the table type at target cluster and then distcp should copy the - // files. This is done to mock the upgrade done using HiveStrictManagedMigration. - HiveConf hiveConf = primary.getConf(); - - try { - //Set the txn config required for this test. This will not enable the full acid functionality in the warehouse. - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); - hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - - primary.run("use " + primaryDbName) - .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '')") - .run("insert into tacid values (1)") - .run("insert into texternal values (2)") - .run("alter table tacid set tblproperties ('transactional'='true')") - .run("alter table texternal SET TBLPROPERTIES('EXTERNAL'='TRUE')") - .run("insert into texternal values (3)") - .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); - } finally { - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - } - - assertTrue(isFullAcidTable(primary.getTable(primaryDbName, "tacid"))); - assertTrue(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "texternal"))); - - List withConfigs = new ArrayList(); - withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'"); - withConfigs.add("'hive.repl.dump.include.acid.tables'='true'"); - withConfigs.add("'hive.repl.include.external.tables'='true'"); - withConfigs.add("'hive.repl.replica.external.table.base.dir' = '" + fullyQualifiedReplicaExternalBase + "'"); - withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'"); - tuple = primary.dump(primaryDbName, withConfigs); - replica.load(replicatedDbName, primaryDbName, withConfigs); - replica.run("use " + replicatedDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("select id from tacid") - .verifyResults(new String[] { "1", "3" }) - .run("select id from texternal") - .verifyResults(new String[] { "1", "2", "3" }); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); - assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); - } -} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java deleted file mode 100644 index f98067c4c7..0000000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hive.hcatalog.listener.DbNotificationListener; -import org.junit.*; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; - -import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; -import static org.junit.Assert.*; - -/** - * TestReplicationWithTableMigrationEx - test replication for Hive2 to Hive3 (Strict managed tables) - */ -public class TestReplicationWithTableMigrationEx { - @Rule - public final TestName testName = new TestName(); - - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigrationEx.class); - private static WarehouseInstance primary, replica; - private String primaryDbName, replicatedDbName; - - @BeforeClass - public static void classLevelSetup() throws Exception { - HashMap overrideProperties = new HashMap<>(); - internalBeforeClassSetup(overrideProperties); - } - - static void internalBeforeClassSetup(Map overrideConfigs) throws Exception { - HiveConf conf = new HiveConf(TestReplicationWithTableMigrationEx.class); - conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); - HashMap hiveConfigs = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); - put("hive.metastore.transactional.event.listeners", ""); - }}; - - HashMap configsForPrimary = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); - }}; - configsForPrimary.putAll(overrideConfigs); - primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); - hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); - replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); - } - - @AfterClass - public static void classLevelTearDown() throws IOException { - primary.close(); - replica.close(); - } - - @Before - public void setup() throws Throwable { - primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); - replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + - SOURCE_OF_REPLICATION + "' = '1,2,3')"); - } - - @After - public void tearDown() throws Throwable { - primary.run("drop database if exists " + primaryDbName + " cascade"); - replica.run("drop database if exists " + replicatedDbName + " cascade"); - } - - private void prepareData(String primaryDbName) throws Throwable { - primary.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " - + "into 3 buckets stored as orc ") - .run("insert into tacid values(1)") - .run("insert into tacid values(2)") - .run("insert into tacid values(3)") - .run("alter table tacidpart add partition(country='france')") - .run("insert into tacidpart partition(country='india') values('mumbai')") - .run("insert into tacidpart partition(country='us') values('sf')") - .run("insert into tacidpart partition(country='france') values('paris')"); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid"))); - assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart"))); - } - - private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { - replica.run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[] {"tacid", "tacidpart"}) - .run("repl status " + replicatedDbName) - .verifyResult(lastReplId) - .run("select count(*) from tacid") - .verifyResult("3") - .run("select id from tacid order by id") - .verifyResults(new String[]{"1", "2", "3"}) - .run("select count(*) from tacidpart") - .verifyResult("3") - .run("select country from tacidpart order by country") - .verifyResults(new String[] {"france", "india", "us"}); - - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); - assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart"))); - } - - private WarehouseInstance.Tuple dumpWithLastEventIdHacked(int eventId) throws Throwable { - BehaviourInjection callerVerifier - = new BehaviourInjection() { - @Override - public CurrentNotificationEventId apply(CurrentNotificationEventId id) { - try { - LOG.warn("GetCurrentNotificationEventIdBehaviour called"); - injectionPathCalled = true; - // keep events to reply during incremental - id.setEventId(eventId); - return id; - } catch (Throwable throwable) { - throwable.printStackTrace(); - return null; - } - } - }; - - InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerVerifier); - try { - return primary.dump(primaryDbName); - } finally { - InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour(); - callerVerifier.assertInjectionsPerformed(true, false); - } - } - - @Test - public void testConcurrentOpDuringBootStrapDumpCreateTableReplay() throws Throwable { - prepareData(primaryDbName); - - // dump with operation after last repl id is fetched. - WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - - // next incremental dump - tuple = primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - @Test - public void testConcurrentOpDuringBootStrapDumpInsertReplay() throws Throwable { - prepareData(primaryDbName); - - // dump with operation after last repl id is fetched. - WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - - // next incremental dump - tuple = primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - @Test - public void testTableLevelDumpMigration() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create table t1 (i int, j int)") - .dump(primaryDbName+".'t1'"); - replica.run("create database " + replicatedDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - - tuple = primary.run("use " + primaryDbName) - .run("insert into t1 values (1, 2)") - .dump(primaryDbName+".'t1'"); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - @Test - public void testConcurrentOpDuringBootStrapDumpInsertOverwrite() throws Throwable { - primary.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into tacid values(1)") - .run("insert into tacid values(2)") - .run("insert into tacid values(3)") - .run("insert overwrite table tacid values(4)") - .run("insert into tacid values(5)"); - - // dump with operation after last repl id is fetched. - WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - replica.run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[] {"tacid"}) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("select count(*) from tacid") - .verifyResult("2") - .run("select id from tacid order by id") - .verifyResults(new String[]{"4", "5"}); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - - // next incremental dump - tuple = primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - replica.run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[] {"tacid"}) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("select count(*) from tacid") - .verifyResult("2") - .run("select id from tacid order by id") - .verifyResults(new String[]{"4", "5",}); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - private void loadWithFailureInAddNotification(String tbl) throws Throwable { - BehaviourInjection callerVerifier - = new BehaviourInjection() { - @Nullable - @Override - public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) { - injectionPathCalled = true; - LOG.warn("InjectableBehaviourObjectStore called for Verifier - Table: " + args.tblName); - if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { - LOG.warn("Verifier - DB: " + args.dbName - + " Constraint Table: " + args.constraintTblName); - return false; - } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + args.tblName); - return !args.tblName.equalsIgnoreCase(tbl); - } - return true; - } - }; - InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); - try { - List withClause = Collections.singletonList("'hive.metastore.transactional.event.listeners'='" - + DbNotificationListener.class.getCanonicalName() + "'"); - replica.loadFailure(replicatedDbName, primaryDbName, withClause); - } finally { - InjectableBehaviourObjectStore.resetCallerVerifier(); - } - callerVerifier.assertInjectionsPerformed(true, false); - } - - @Test - public void testIncLoadPenFlagPropAlterDB() throws Throwable { - prepareData(primaryDbName); - - // dump with operation after last repl id is fetched. - WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters())); - - primary.run("use " + primaryDbName) - .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')") - .run("create table tbl_temp (fld int)") - .dump(primaryDbName); - - loadWithFailureInAddNotification("tbl_temp"); - Database replDb = replica.getDatabase(replicatedDbName); - assertTrue(ReplUtils.isFirstIncPending(replDb.getParameters())); - assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters())); - assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val")); - - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - @Test - public void testIncLoadPenFlagWithMoveOptimization() throws Throwable { - List withClause = Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); - - prepareData(primaryDbName); - - // dump with operation after last repl id is fetched. - WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4); - replica.load(replicatedDbName, primaryDbName, withClause); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - - // next incremental dump - tuple = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName, withClause); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - } - - private void verifyUserName(String userName) throws Throwable { - assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tbl_own").getOwner())); - assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tbl_own").getOwner())); - assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tacid").getOwner())); - assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tacid").getOwner())); - assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tacidpart").getOwner())); - assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tacidpart").getOwner())); - assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tbl_part").getOwner())); - assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tbl_part").getOwner())); - assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "view_own").getOwner())); - assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "view_own").getOwner())); - } - - private void alterUserName(String userName) throws Throwable { - primary.run("use " + primaryDbName) - .run("alter table tbl_own set owner USER " + userName) - .run("alter table tacid set owner USER " + userName) - .run("alter table tacidpart set owner USER " + userName) - .run("alter table tbl_part set owner USER " + userName) - .run("alter table view_own set owner USER " + userName); - } - - @Test - public void testOnwerPropagation() throws Throwable { - primary.run("use " + primaryDbName) - .run("create table tbl_own (fld int)") - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " - + "into 3 buckets stored as orc ") - .run("create table tbl_part (fld int) partitioned by (country string)") - .run("insert into tbl_own values (1)") - .run("create view view_own as select * from tbl_own"); - - // test bootstrap - alterUserName("hive"); - primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyUserName("hive"); - - // test incremental - alterUserName("hive1"); - primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyUserName("hive1"); - } - - @Test - public void testOnwerPropagationInc() throws Throwable { - primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - - primary.run("use " + primaryDbName) - .run("create table tbl_own (fld int)") - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " - + "into 3 buckets stored as orc ") - .run("create table tbl_part (fld int) partitioned by (country string)") - .run("insert into tbl_own values (1)") - .run("create view view_own as select * from tbl_own"); - - // test incremental when table is getting created in the same load - alterUserName("hive"); - primary.dump(primaryDbName); - replica.loadWithoutExplain(replicatedDbName, primaryDbName); - verifyUserName("hive"); - } - - @Test - public void dynamicallyConvertNonAcidToAcidTable() throws Throwable { - // Non-acid table converted to an ACID table should be prohibited on source cluster with - // strict managed false. - primary.run("use " + primaryDbName) - .run("create table t1 (id int) stored as orc") - .run("insert into table t1 values (1)") - .run("create table t2 (place string) partitioned by (country string) stored as orc") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .runFailure("alter table t1 set tblproperties('transactional'='true')") - .runFailure("alter table t2 set tblproperties('transactional'='true')") - .runFailure("alter table t1 set tblproperties('transactional'='true', " + - "'transactional_properties'='insert_only')") - .runFailure("alter table t2 set tblproperties('transactional'='true', " + - "'transactional_properties'='insert_only')"); - - } - - @Test - public void prohibitManagedTableLocationChangeOnReplSource() throws Throwable { - String tmpLocation = "/tmp/" + System.nanoTime(); - primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777")); - - // For managed tables at source, the table location shouldn't be changed for the given - // non-partitioned table and partition location shouldn't be changed for partitioned table as - // alter event doesn't capture the new files list. So, it may cause data inconsistsency. So, - // if database is enabled for replication at source, then alter location on managed tables - // should be blocked. - primary.run("use " + primaryDbName) - .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into t1 values(1)") - .run("create table t2 (place string) partitioned by (country string) stored as orc") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .runFailure("alter table t1 set location '" + tmpLocation + "'") - .runFailure("alter table t2 partition(country='india') set location '" + tmpLocation + "'") - .runFailure("alter table t2 set location '" + tmpLocation + "'"); - - primary.miniDFSCluster.getFileSystem().delete(new Path(tmpLocation), true); - } -} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java index db2d3a4296..44eead0b30 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java @@ -97,6 +97,7 @@ static void internalBeforeClassSetup(Map primaryOverrides, Map additionalOverrides = new HashMap() {{ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false"); }}; Map replicatedOverrides = new HashMap<>(); diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java index 3d7bd9034c..5325e29a51 100644 --- a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java @@ -19,6 +19,7 @@ import org.apache.hadoop.util.concurrent.AsyncGet; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.Random; @@ -85,6 +86,7 @@ public void testRemoteFail() throws InterruptedException { } + @Ignore("HIVE-24019") @Test public void testStress() throws InterruptedException { final int numCommunicators = 10; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java index 2ee66e58a0..0d2cee5a3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java @@ -139,13 +139,6 @@ private void finalizeAlterTableWithWriteIdOp(Table table, Table oldTable, List

{}", oneSrc.getPath(), toPath); srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null)); } - if (work.isCopyToMigratedTxnTable()) { - if (isDuplicateCopy(dstFs, toPath, srcFiles)) { - return 0; - } - - Path modifiedToPath = getModifiedToPath(toPath); - if (modifiedToPath == null) { - console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); - return 6; - } - toPath = modifiedToPath; - } } LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); @@ -235,38 +210,6 @@ public int execute() { } } - private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List srcFiles) - throws IOException { - if (work.isNeedCheckDuplicateCopy()) { - updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); - if (srcFiles.isEmpty()) { - LOG.info("All files are already present in the base directory. Skipping copy task."); - return true; - } - } - return false; - } - - private Path getModifiedToPath(Path toPath) { - // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it - // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. - // The toPath received in ReplCopyWork is pointing to table/partition base location. - // So, just need to append the base or delta directory. - // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and - // hence need to create base directory. If false, then it is repl load for regular insert into or - // load flow and hence just create delta directory. - Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf); - if (writeId == null) { - return null; - } - // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any - // duplicate copy from the source. Check HIVE-21197 for more detail. - int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ? - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : - context.getHiveTxnManager().getStmtIdAndIncrement(); - return new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); - } private List filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); @@ -319,35 +262,28 @@ public String getName() { return "REPL_COPY"; } - public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean isAutoPurge, boolean needRecycle, - boolean copyToMigratedTxnTable) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, - copyToMigratedTxnTable, true); - } public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, - boolean copyToMigratedTxnTable, boolean readSourceAsFileList) { + boolean readSourceAsFileList) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, - copyToMigratedTxnTable, readSourceAsFileList, false); + readSourceAsFileList, false); } private static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, - boolean copyToMigratedTxnTable, boolean readSourceAsFileList, + boolean readSourceAsFileList, boolean overWrite) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite); rcwork.setReadSrcAsFilesList(readSourceAsFileList); - if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { + if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); rcwork.setNeedRecycle(needRecycle); } - rcwork.setCopyToMigratedTxnTable(copyToMigratedTxnTable); // For replace case, duplicate check should not be done. The new base directory will automatically make the older // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple // replace events getting replayed in the first incremental load. @@ -365,7 +301,8 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, + true, false); } /* @@ -375,6 +312,6 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean readSourceAsFileList, boolean overWrite) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, - false, readSourceAsFileList, overWrite); + readSourceAsFileList, overWrite); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index 9131aeee81..48721d394f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -92,38 +92,12 @@ public int execute() { LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " + work.getTxnIds().toString() + " and target txn id " + txnIds.toString()); return 0; - case REPL_MIGRATION_OPEN_TXN: - // if transaction is already opened (mostly by repl load command), then close it. - if (txnManager.isTxnOpen()) { - long txnId = txnManager.getCurrentTxnId(); - txnManager.commitTxn(); - LOG.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId); - } - Long txnIdMigration = txnManager.openTxn(context, user); - long writeId = txnManager.getTableWriteId(work.getDbName(), work.getTableName()); - String validTxnList = txnManager.getValidTxns().toString(); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList); - conf.set(ReplUtils.REPL_CURRENT_TBL_WRITE_ID, Long.toString(writeId)); - LOG.info("Started open txn for migration : " + txnIdMigration + " with valid txn list : " + - validTxnList + " and write id " + writeId); - return 0; case REPL_ABORT_TXN: for (long txnId : work.getTxnIds()) { txnManager.replRollbackTxn(replPolicy, txnId); LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId); } return 0; - case REPL_MIGRATION_COMMIT_TXN: - assert (work.getReplLastIdInfo() != null); - long txnIdMigrationCommit = txnManager.getCurrentTxnId(); - CommitTxnRequest commitTxnRequestMigr = new CommitTxnRequest(txnIdMigrationCommit); - commitTxnRequestMigr.setReplLastIdInfo(work.getReplLastIdInfo()); - txnManager.replCommitTxn(commitTxnRequestMigr); - conf.unset(ValidTxnList.VALID_TXNS_KEY); - conf.unset(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); - LOG.info("Committed Migration Txn with replLastIdInfo: " + work.getReplLastIdInfo() + " for txnId: " + - txnIdMigrationCommit); - return 0; case REPL_COMMIT_TXN: // Currently only one commit txn per event is supported. assert (work.getTxnIds().size() == 1); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java index 03e8c4e732..4dba12ccd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; @@ -35,12 +37,14 @@ public class AckTask extends Task implements Serializable { private static final long serialVersionUID = 1L; + private Logger LOG = LoggerFactory.getLogger(AckTask.class); @Override public int execute() { try { Path ackPath = work.getAckFilePath(); Utils.create(ackPath, conf); + LOG.info("Created ack file : {} ", ackPath); } catch (SemanticException e) { setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index d8b10bd0e5..9cbb1ed856 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -106,38 +106,6 @@ public MetaData getMetaData() { public ImportTableDesc tableDesc(String dbName) throws SemanticException { try { Table table = new Table(metadata.getTable()); - boolean externalTableOnSource = TableType.EXTERNAL_TABLE.equals(table.getTableType()); - // The table can be non acid in case of replication from 2.6 cluster. - if (!AcidUtils.isTransactionalTable(table) - && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) - && (table.getTableType() == TableType.MANAGED_TABLE)) { - Hive hiveDb = Hive.get(hiveConf); - //TODO : dump metadata should be read to make sure that migration is required. - HiveStrictManagedMigration.TableMigrationOption migrationOption = - HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(), - table.getTableType(), null, hiveConf, - hiveDb.getMSC(), true); - HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(), - migrationOption, false, - getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf); - // If the conversion is from non transactional to transactional table - if (AcidUtils.isTransactionalTable(table)) { - replicationSpec().setMigratingToTxnTable(); - // For migrated tables associate bootstrap writeId when replicating stats. - if (table.getTTable().isSetColStats()) { - table.getTTable().setWriteId(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID); - } - } - if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - // since we have converted to an external table now after applying the migration rules the - // table location has to be set to null so that the location on the target is picked up - // based on default configuration - table.setDataLocation(null); - if(!externalTableOnSource) { - replicationSpec().setMigratingToExternalTable(); - } - } - } ImportTableDesc tableDesc = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { @@ -190,7 +158,7 @@ private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDe StorageDescriptor sd = partition.getSd(); String location = sd.getLocation(); - if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) { + if (!tblDesc.isExternal()) { /** * this is required for file listing of all files in a partition for managed table as described in * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator} @@ -207,8 +175,7 @@ private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDe colStatsDesc.setDbName(tblDesc.getDatabaseName()); columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj()); columnStatistics.setEngine(colStats.getEngine()); - writeId = replicationSpec().isMigratingToTxnTable() ? - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId(); + writeId = partition.getWriteId(); } AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 1444e15041..41e09e1d0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -115,7 +116,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { return allTables.isEmpty() && allFunctions.isEmpty(); } - private Task createDbTask(Database dbObj) { + private Task createDbTask(Database dbObj) throws MetaException { // note that we do not set location - for repl load, we want that auto-created. CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(dbObj.getName(), dbObj.getDescription(), null, null, false, updateDbProps(dbObj, context.dumpDirectory)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c4cfcf96f1..d751794181 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -251,7 +251,6 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly() || (TableType.EXTERNAL_TABLE.equals(table.getTableType()) - && !event.replicationSpec().isMigratingToExternalTable() ); if (isOnlyDDLOperation) { @@ -272,17 +271,8 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc if (event.replicationSpec().isInReplicationScope() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; - if (event.replicationSpec().isMigratingToTxnTable()) { - // Migrating to transactional tables in bootstrap load phase. - // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. - // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata. - stagingDir = new Path(stagingDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)); - } } else { - loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : - (event.replicationSpec().isMigratingToTxnTable() - ? LoadFileType.KEEP_EXISTING - : LoadFileType.OVERWRITE_EXISTING); + loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); @@ -330,26 +320,11 @@ private String getPartitionName(Path partitionMetadataFullPath) { LoadFileType loadFileType) { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { - if (event.replicationSpec().isMigratingToTxnTable()) { - // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir. - // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. - LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID - ); - loadTableWork.setInheritTableSpecs(false); - loadTableWork.setStmtId(0); - - // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0. - loadTableWork.setInsertOverwrite(true); - moveWork.setLoadTableWork(loadTableWork); - } else { - LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), - Collections.singletonList(new Path(partSpec.getLocation())), - true, null, null); - moveWork.setMultiFilesDesc(loadFilesWork); - } + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(new Path(partSpec.getLocation())), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), @@ -359,7 +334,6 @@ private String getPartitionName(Path partitionMetadataFullPath) { moveWork.setLoadTableWork(loadTableWork); } moveWork.setIsInReplicationScope(event.replicationSpec().isInReplicationScope()); - return TaskFactory.get(moveWork, context.hiveConf); } @@ -375,9 +349,6 @@ private Path locationOnReplicaWarehouse(Table table, AlterTableAddPartitionDesc. throws MetaException, HiveException { String child = Warehouse.makePartPath(partSpec.getPartSpec()); if (tableDesc.isExternal()) { - if (event.replicationSpec().isMigratingToExternalTable()) { - return new Path(tableDesc.getLocation(), child); - } String externalLocation = ReplExternalTables.externalTableLocation(context.hiveConf, partSpec.getLocation()); return new Path(externalLocation); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 35ea777ef5..8572f081c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -186,6 +186,9 @@ private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask, TableLo throws Exception { Table table = tblDesc.toTable(context.hiveConf); ReplicationSpec replicationSpec = event.replicationSpec(); + if (!tblDesc.isExternal()) { + tblDesc.setLocation(null); + } Task createTableTask = tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); if (tblRootTask == null) { @@ -206,17 +209,6 @@ private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask, TableLo Task replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); parentTask.addDependentTask(replTxnTask); parentTask = replTxnTask; - } else if (replicationSpec.isMigratingToTxnTable()) { - // Non-transactional table is converted to transactional table. - // The write-id 1 is used to copy data for the given table and also no writes are aborted. - ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList( - AcidUtils.getFullTableName(tblDesc.getDatabaseName(), tblDesc.getTableName()), - new long[0], new BitSet(), ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID); - ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), null, - validWriteIdList.writeToString(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); - Task replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); - parentTask.addDependentTask(replTxnTask); - parentTask = replTxnTask; } boolean shouldCreateLoadTableTask = ( !isPartitioned(tblDesc) @@ -224,7 +216,7 @@ private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask, TableLo ) || tuple.isConvertedFromManagedToExternal; if (shouldCreateLoadTableTask) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); - Task loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()), + Task loadTableTask = loadTableTask(table, replicationSpec, table.getDataLocation(), event.dataPath()); parentTask.addDependentTask(loadTableTask); } @@ -282,14 +274,8 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent if (replicationSpec.isInReplicationScope() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; - if (event.replicationSpec().isMigratingToTxnTable()) { - // Migrating to transactional tables in bootstrap load phase. - // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. - // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata. - tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)); - } } else { - loadFileType = (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable()) + loadFileType = (replicationSpec.isReplace()) ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); } @@ -304,25 +290,11 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { - if (replicationSpec.isMigratingToTxnTable()) { - // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir. - // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. - LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID - ); - loadTableWork.setStmtId(0); - - // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0. - loadTableWork.setInsertOverwrite(true); - moveWork.setLoadTableWork(loadTableWork); - } else { - LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), - Collections.singletonList(tgtPath), - true, null, null); - moveWork.setMultiFilesDesc(loadFilesWork); - } + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(tgtPath), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 52b6547e95..b00341a4f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -22,10 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -52,7 +49,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; -import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.slf4j.Logger; import java.util.ArrayList; @@ -224,34 +220,6 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa return addUpdateReplStateTasks(messageHandler.getUpdatedMetadata(), tasks); } - private Task getMigrationCommitTxnTask(String dbName, String tableName, - List> partSpec, String replState, - Task preCursor) throws SemanticException { - ReplLastIdInfo replLastIdInfo = new ReplLastIdInfo(dbName, Long.parseLong(replState)); - replLastIdInfo.setTable(tableName); - if (partSpec != null && !partSpec.isEmpty()) { - List partitionList = new ArrayList<>(); - for (Map part : partSpec) { - try { - partitionList.add(Warehouse.makePartName(part, false)); - } catch (MetaException e) { - throw new SemanticException(e.getMessage()); - } - } - replLastIdInfo.setPartitionList(partitionList); - } - - Task updateReplIdTxnTask = TaskFactory.get(new ReplTxnWork(replLastIdInfo, ReplTxnWork - .OperationType.REPL_MIGRATION_COMMIT_TXN), conf); - - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTxnTask); - log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), - updateReplIdTxnTask.getClass(), updateReplIdTxnTask.getId()); - } - return updateReplIdTxnTask; - } - private Task tableUpdateReplStateTask(String dbName, String tableName, Map partSpec, String replState, Task preCursor) throws SemanticException { @@ -300,14 +268,6 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa return importTasks; } - boolean needCommitTx = updatedMetaDataTracker.isNeedCommitTxn(); - // In migration flow, we should have only one table update per event. - if (needCommitTx) { - // currently, only commit txn event can have updates in multiple table. Commit txn does not starts - // a txn and thus needCommitTx must have set to false. - assert updatedMetaDataTracker.getUpdateMetaDataList().size() <= 1; - } - // Create a barrier task for dependency collection of import tasks Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); @@ -320,44 +280,24 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa String tableName = updateMetaData.getTableName(); // If any partition is updated, then update repl state in partition object - if (needCommitTx) { - if (updateMetaData.getPartitionsList().size() > 0) { - updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, - updateMetaData.getPartitionsList(), replState, barrierTask); - tasks.add(updateReplIdTask); - // commit txn task will update repl id for table and database also. - break; - } - } else { - for (final Map partSpec : updateMetaData.getPartitionsList()) { - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); - } + + for (final Map partSpec : updateMetaData.getPartitionsList()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); } + // If any table/partition is updated, then update repl state in table object if (tableName != null) { - if (needCommitTx) { - updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, null, - replState, barrierTask); - tasks.add(updateReplIdTask); - // commit txn task will update repl id for database also. - break; - } updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); tasks.add(updateReplIdTask); } - // If any table/partition is updated, then update repl state in db object - if (needCommitTx) { - updateReplIdTask = getMigrationCommitTxnTask(dbName, null, null, - replState, barrierTask); - tasks.add(updateReplIdTask); - } else { - // For table level load, need not update replication state for the database - updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); - tasks.add(updateReplIdTask); - } + + // For table level load, need not update replication state for the database + updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); + tasks.add(updateReplIdTask); + } if (tasks.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index d531f5bff2..bdedb41a42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -102,14 +102,6 @@ // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; - // Migrating to transactional tables in bootstrap load phase. - // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. - public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L; - - // we keep the statement id as 0 so that the base directory is created with 0 and is easy to find out during - // duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained. - public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0; - // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; @@ -229,32 +221,6 @@ public static String getNonEmpty(String configParam, HiveConf hiveConf, String e return val; } - public static boolean isTableMigratingToTransactional(HiveConf conf, - org.apache.hadoop.hive.metastore.api.Table tableObj) - throws TException, IOException { - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && - !AcidUtils.isTransactionalTable(tableObj) && - TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) { - //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata. - HiveStrictManagedMigration.TableMigrationOption migrationOption = - HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE, - null, conf, null, true); - return migrationOption == MANAGED; - } - return false; - } - - private static void addOpenTxnTaskForMigration(String actualDbName, String actualTblName, - HiveConf conf, - UpdatedMetaDataTracker updatedMetaDataTracker, - List> taskList, - Task childTask) { - Task replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName, - ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf); - replTxnTask.addDependentTask(childTask); - updatedMetaDataTracker.setNeedCommitTxn(true); - taskList.add(replTxnTask); - } public static List> addOpenTxnTaskForMigration(String actualDbName, String actualTblName, HiveConf conf, @@ -264,10 +230,6 @@ private static void addOpenTxnTaskForMigration(String actualDbName, String actua throws IOException, TException { List> taskList = new ArrayList<>(); taskList.add(childTask); - if (isTableMigratingToTransactional(conf, tableObj) && updatedMetaDataTracker != null) { - addOpenTxnTaskForMigration(actualDbName, actualTblName, conf, updatedMetaDataTracker, - taskList, childTask); - } return taskList; } @@ -278,19 +240,10 @@ private static void addOpenTxnTaskForMigration(String actualDbName, String actua long writeId) throws IOException, TException { List> taskList = new ArrayList<>(); - boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj); - ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats, isMigratingToTxn); + ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats); work.setWriteId(writeId); Task task = TaskFactory.get(work, conf); taskList.add(task); - // If the table is going to be migrated to a transactional table we will need to open - // and commit a transaction to associate a valid writeId with the statistics. - if (isMigratingToTxn) { - ReplUtils.addOpenTxnTaskForMigration(colStats.getStatsDesc().getDbName(), - colStats.getStatsDesc().getTableName(), conf, updatedMetadata, taskList, - task); - } - return taskList; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index dea2ca2914..4cc1485358 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -207,22 +207,6 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } } - private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table tableObj, MetaData rv, - EximUtil.SemanticAnalyzerWrapperContext x) - throws IOException, TException, HiveException { - x.getLOG().debug("Converting table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + - " with para " + tableObj.getParameters()); - //TODO : isPathOwnedByHive is hard coded to true, need to get it from repl dump metadata. - TableType tableType = TableType.valueOf(tableObj.getTableType()); - HiveStrictManagedMigration.TableMigrationOption migrationOption = - HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, tableType, - null, x.getConf(), x.getHive().getMSC(), true); - HiveStrictManagedMigration.migrateTable(tableObj, tableType, migrationOption, false, - getHiveUpdater(x.getConf()), x.getHive().getMSC(), x.getConf()); - x.getLOG().debug("Converted table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + - " with para " + tableObj.getParameters()); - } - /** * The same code is used from both the "repl load" as well as "import". * Given that "repl load" now supports two modes "repl load dbName [location]" and @@ -278,26 +262,7 @@ public static boolean prepareImport(boolean isImportCmd, ImportTableDesc tblDesc; org.apache.hadoop.hive.metastore.api.Table tblObj = rv.getTable(); try { - // The table can be non acid in case of replication from a cluster with STRICT_MANAGED set to false. - if (!TxnUtils.isTransactionalTable(tblObj) && replicationSpec.isInReplicationScope() && - x.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && - (TableType.valueOf(tblObj.getTableType()) == TableType.MANAGED_TABLE)) { - //TODO : dump metadata should be read to make sure that migration is required. - upgradeTableDesc(tblObj, rv, x); - //if the conversion is from non transactional to transactional table - if (TxnUtils.isTransactionalTable(tblObj)) { - replicationSpec.setMigratingToTxnTable(); - } - tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj); - if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) { - replicationSpec.setMigratingToExternalTable(); - tblDesc.setExternal(true); - // we should set this to null so default location for external tables is chosen on target - tblDesc.setLocation(null); - } - } else { - tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj); - } + tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj); } catch (Exception e) { throw new HiveException(e); } @@ -415,8 +380,7 @@ private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(P StorageDescriptor sd = partition.getSd(); String location = null; - if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() && - !replicationSpec.isMigratingToExternalTable()) { + if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()) { location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation()); LOG.debug("partition {} has data location: {}", partition, location); } else { @@ -452,10 +416,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadFileType lft; boolean isSkipTrash = false; boolean needRecycle = false; - boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); - if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable || - x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { + if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean( + REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { lft = LoadFileType.IGNORE; destPath = loadPath = tgtPath; isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters()); @@ -484,7 +447,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } else { destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); lft = replace ? LoadFileType.REPLACE_ALL : - replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; + LoadFileType.OVERWRITE_EXISTING; } } @@ -503,7 +466,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); + isSkipTrash, needRecycle, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -521,9 +484,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } else { LoadTableDesc loadTableWork = new LoadTableDesc( loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); - if (replicationSpec.isMigratingToTxnTable()) { - loadTableWork.setInsertOverwrite(replace); - } loadTableWork.setStmtId(stmtId); moveWork.setLoadTableWork(loadTableWork); } @@ -580,7 +540,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); boolean isSkipTrash = false; boolean needRecycle = false; - boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec) || (tblDesc.isExternal() && tblDesc.getLocation() == null)) { @@ -613,8 +572,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadFileType loadFileType; Path destPath; - if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable || - x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { + if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean( + REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { loadFileType = LoadFileType.IGNORE; destPath = tgtLocation; isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters()); @@ -626,8 +585,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } } else { loadFileType = replicationSpec.isReplace() ? - LoadFileType.REPLACE_ALL : - replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; + LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; //Replication scope the write id will be invalid boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || replicationSpec.isInReplicationScope(); @@ -651,7 +609,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); + x.getConf(), isSkipTrash, needRecycle, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } @@ -682,9 +640,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, partSpec.getPartSpec(), loadFileType, writeId); - if (replicationSpec.isMigratingToTxnTable()) { - loadTableWork.setInsertOverwrite(replicationSpec.isReplace()); - } loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); moveWork.setLoadTableWork(loadTableWork); @@ -725,8 +680,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static boolean shouldSkipDataCopyInReplScope(ImportTableDesc tblDesc, ReplicationSpec replicationSpec) { return ((replicationSpec != null) && replicationSpec.isInReplicationScope() - && tblDesc.isExternal() - && !replicationSpec.isMigratingToExternalTable()); + && tblDesc.isExternal()); } /** @@ -741,17 +695,6 @@ we use isExternal and not tableType() method since that always gives type as man we don't do anything since for external table partitions the path is already set correctly in {@link org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler} */ - if (replicationSpec.isMigratingToExternalTable()) { - // at this point the table.getDataLocation() should be set already for external tables - // using the correct values of default warehouse external table location on target. - partSpec.setLocation(new Path(tblDesc.getLocation(), - Warehouse.makePartPath(partSpec.getPartSpec())).toString()); - LOG.debug("partition spec {} has location set to {} for a table migrating to external table" - + " from managed table", - StringUtils.join(partSpec.getPartSpec().entrySet(), ","), - partSpec.getLocation() - ); - } return; } Path tgtPath; @@ -1197,20 +1140,15 @@ private static void createReplImportTasks( tblDesc.getDatabaseName(), tblDesc.getTableName(), null); - if (replicationSpec.isMigratingToTxnTable()) { - x.setOpenTxnTask(TaskFactory.get(new ReplTxnWork(tblDesc.getDatabaseName(), - tblDesc.getTableName(), ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), x.getConf())); - updatedMetadata.setNeedCommitTxn(true); - } } if (tblDesc.getLocation() == null) { - if (!waitOnPrecursor){ + if (!waitOnPrecursor) { tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName(), tblDesc.isExternal()).toString()); } else { tblDesc.setLocation( - wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal()) - ).toString()); + wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal()) + ).toString()); } } @@ -1290,7 +1228,6 @@ private static void createReplImportTasks( table = createNewTableMetadataObject(tblDesc, true); isOldTableValid = false; } - // Table existed, and is okay to replicate into, not dropping and re-creating. if (isPartitioned(tblDesc)) { x.getLOG().debug("table partitioned"); @@ -1352,6 +1289,9 @@ private static void createReplImportTasks( } } } else { + if (table != null && table.getSd().getLocation() != null) { + tblDesc.setLocation(table.getSd().getLocation()); + } x.getLOG().debug("table non-partitioned"); if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 8675ace7c5..cb0f4b3fda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -46,8 +46,6 @@ //TxnIds snapshot private String validTxnList = null; private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT - private boolean isMigratingToTxnTable = false; - private boolean isMigratingToExternalTable = false; private boolean needDupCopyCheck = false; //Determine if replication is done using repl or export-import private boolean isRepl = false; @@ -404,20 +402,6 @@ public SCOPE getScope(){ } } - public boolean isMigratingToTxnTable() { - return isMigratingToTxnTable; - } - public void setMigratingToTxnTable() { - isMigratingToTxnTable = true; - } - - public boolean isMigratingToExternalTable() { - return isMigratingToExternalTable; - } - - public void setMigratingToExternalTable() { - isMigratingToExternalTable = true; - } public static void copyLastReplId(Map srcParameter, Map destParameter) { String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 57f3043dac..9836c66e1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -57,9 +57,6 @@ oldPartSpec.put(fs.getName(), beforeIterator.next()); newPartSpec.put(fs.getName(), afterIterator.next()); } - if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj)) { - replicationSpec.setMigratingToTxnTable(); - } AlterTableRenamePartitionDesc renamePtnDesc = new AlterTableRenamePartitionDesc( tableName, oldPartSpec, newPartSpec, replicationSpec, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 7a4cb93c12..07f2f365ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -58,9 +58,7 @@ TableName oldName = TableName.fromString(tableObjBefore.getTableName(), null, oldDbName); TableName newName = TableName.fromString(tableObjAfter.getTableName(), null, newDbName); ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec(); - if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter)) { - replicationSpec.setMigratingToTxnTable(); - } + AlterTableRenameDesc renameTableDesc = new AlterTableRenameDesc(oldName, replicationSpec, false, newName.getNotEmptyDbTable()); renameTableDesc.setWriteId(msg.getWriteId()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java index c90ea437f5..14900255fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java @@ -44,8 +44,6 @@ private final String colName; private final String colType; private final ColumnStatistics colStats; - private final boolean isMigratingToTxn; // Is the table for which we are updating stats going - // to be migrated during replication. private long writeId; public ColumnStatsUpdateWork(String partName, @@ -61,12 +59,10 @@ public ColumnStatsUpdateWork(String partName, this.colName = colName; this.colType = colType; this.colStats = null; - this.isMigratingToTxn = false; } - public ColumnStatsUpdateWork(ColumnStatistics colStats, boolean isMigratingToTxn) { + public ColumnStatsUpdateWork(ColumnStatistics colStats) { this.colStats = colStats; - this.isMigratingToTxn = isMigratingToTxn; this.partName = null; this.mapProp = null; this.dbName = null; @@ -106,8 +102,6 @@ public String getColType() { public ColumnStatistics getColStats() { return colStats; } - public boolean getIsMigratingToTxn() { return isMigratingToTxn; } - @Override public void setWriteId(long writeId) { this.writeId = writeId; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index dd01b21034..21da20fc06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -57,8 +57,6 @@ private String distCpDoAsUser = null; - private boolean copyToMigratedTxnTable; - private boolean checkDuplicateCopy = false; private boolean overWrite = false; @@ -112,14 +110,6 @@ public void setAutoPurge(boolean isAutoPurge) { this.isAutoPurge = isAutoPurge; } - public boolean isCopyToMigratedTxnTable() { - return copyToMigratedTxnTable; - } - - public void setCopyToMigratedTxnTable(boolean copyToMigratedTxnTable) { - this.copyToMigratedTxnTable = copyToMigratedTxnTable; - } - public boolean isNeedCheckDuplicateCopy() { return checkDuplicateCopy; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java index a9f98cc8a4..7e16a7c49e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java @@ -50,8 +50,7 @@ * Different kind of events supported for replaying. */ public enum OperationType { - REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID, REPL_WRITEID_STATE, - REPL_MIGRATION_OPEN_TXN, REPL_MIGRATION_COMMIT_TXN + REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID, REPL_WRITEID_STATE } OperationType operation; @@ -93,17 +92,6 @@ public ReplTxnWork(String dbName, String tableName, List partNames, this.operation = type; } - public ReplTxnWork(String dbName, String tableName, OperationType type) { - this(null, dbName, tableName, null, type, null, null); - assert type == OperationType.REPL_MIGRATION_OPEN_TXN; - } - - public ReplTxnWork(ReplLastIdInfo replLastIdInfo, OperationType type) { - this(null, null, null, null, type, null, null); - assert type == OperationType.REPL_MIGRATION_COMMIT_TXN; - this.replLastIdInfo = replLastIdInfo; - } - public void addWriteEventInfo(WriteEventInfo writeEventInfo) { if (this.writeEventInfos == null) { this.writeEventInfos = new ArrayList<>(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java index 986e508535..9d0e57948b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java @@ -84,7 +84,7 @@ public Void call() throws Exception { @Test public void testRetrySuccessValidExceptionList() throws Throwable { Retryable retryable = Retryable.builder() - .withTotalDuration(30) + .withTotalDuration(60) .withInitialDelay(1) .withBackoff(1.0) .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q index 4c18223c8f..3d5961b75c 100644 --- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q +++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q @@ -4,6 +4,7 @@ set hive.test.mode.prefix=; set hive.test.mode.nosamplelist=managed_t,ext_t,managed_t_imported,managed_t_r_imported,ext_t_imported,ext_t_r_imported; set hive.repl.include.external.tables=true; set hive.repl.dump.metadata.only.for.external.table=false; +set hive.repl.data.copy.lazy=false; drop table if exists managed_t; drop table if exists ext_t; diff --git a/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q b/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q index 08130b652b..46973615bd 100644 --- a/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q +++ b/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q @@ -2,6 +2,7 @@ set hive.mapred.mode=nonstrict; set hive.test.mode=true; set hive.test.mode.prefix=; set hive.test.mode.nosamplelist=replsrc,repldst,repldst_md; +set hive.repl.data.copy.lazy=false; drop table if exists replsrc; drop table if exists repldst;