diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..41bb1f743c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -478,6 +478,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), + REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table", + false, + "Indicates whether external table replication dump only metadata information or data + metadata"), REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false, "Indicates if repl dump should bootstrap the information about ACID tables along with \n" + "incremental dump for replication. It is recommended to keep this config parameter \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java new file mode 100644 index 0000000000..f2d61ccf44 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * TestMetadataReplicationScenariosExternalTables - Test metadata only replication + * for external tables + */ +public class TestMetadataReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + String extraPrimaryDb; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestReplicationScenarios.class); + } + + @Before + public void setup() throws Throwable { + super.setup(); + extraPrimaryDb = "extra_" + primaryDbName; + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + extraPrimaryDb + " cascade"); + super.tearDown(); + } + + @Test + public void replicationWithoutExternalTables() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1"}) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2"}) + .verifyReplTargetProperty(replicatedDbName); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyFailure(new String[] {"t3"}) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void externalTableReplicationWithDefaultPaths() throws Throwable { + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump("repl dump " + primaryDbName); + + // verify that the external table info is not written as metadata only replication + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + List withClauseOptions = externalTableBasePathWithClause(); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult(null) + .run("select country from t2 where country = 'france'") + .verifyResult(null); + + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult(null) + .run("select id from t4") + .verifyResult(null); + + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + } + + @Test + public void externalTableReplicationWithCustomPaths() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // Create base directory but use HDFS path without schema or authority details. + // Hive should pick up the local cluster's HDFS schema/authority. + externalTableBasePathWithClause(); + List loadWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.update'=''" + ); + + WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) + .run("create external table a (i int, j int) " + + "row format delimited fields terminated by ',' " + + "location '" + externalTableLocation.toUri() + "'") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 'a'") + .verifyResults(Collections.singletonList("a")) + .run("select * From a").verifyResults(Collections.emptyList()); + + //externally add data to location + try (FSDataOutputStream outputStream = + fs.create(new Path(externalTableLocation, "file1.txt"))) { + outputStream.write("1,2\n".getBytes()); + outputStream.write("13,21\n".getBytes()); + } + + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("select i From a") + .verifyResults(new String[] {}) + .run("select j from a") + .verifyResults(new String[] {}); + + // alter table location to something new. + externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); + incrementalTuple = primary.run("use " + primaryDbName) + .run("alter table a set location '" + externalTableLocation + "'") + .dump(primaryDbName, incrementalTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select i From a") + .verifyResults(Collections.emptyList()); + } + + @Test + public void externalTableWithPartitions() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t2/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t2 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("insert into t2 partition(country='india') values ('bangalore')") + .dump("repl dump " + primaryDbName); + + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] {"t2"}) + .run("select place from t2") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // add new data externally, to a partition, but under the table level top directory + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select distinct(country) from t2") + .verifyResults(new String[] {}) + .run("select place from t2 where country='india'") + .verifyResults(new String[] {}) + .run("select place from t2 where country='australia'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Path customPartitionLocation = + new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france"); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // add new partitions to the table, at an external location than the table level directory + try (FSDataOutputStream outputStream = fs + .create(new Path(customPartitionLocation, "file.txt"))) { + outputStream.write("paris".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation + .toString() + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // change the location of the partition via alter command + String tmpLocation = "/tmp/" + System.nanoTime(); + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Changing location of the external table, should result in changes to the location of + // partition residing within the table location and not the partitions located outside. + String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2"; + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='france') values ('lyon')") + .run("alter table t2 set location '" + tmpLocation2 + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause); + } + + @Test + public void externalTableIncrementalReplication() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); + replica.load(replicatedDbName, tuple.dumpLocation); + + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("alter table t1 add partition(country='india')") + .run("alter table t1 add partition(country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + // Add new data externally, to a partition, but under the partition level top directory + // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("bangalore\n".getBytes()); + } + + List loadWithClause = externalTableBasePathWithClause(); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Delete one of the file and update another one. + fs.delete(new Path(partitionDir, "file.txt"), true); + fs.delete(new Path(partitionDir, "file1.txt"), true); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("chennai\n".getBytes()); + } + + // Repl load with zero events but external tables location info should present. + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Hive hive = Hive.get(replica.getConf()); + Set partitions = + hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1")); + List paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + + tuple = primary + .run("alter table t1 drop partition (country='india')") + .run("alter table t1 drop partition (country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("select * From t1") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + for (String path : paths) { + assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); + } + } + + @Test + public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2" }) + .verifyReplTargetProperty(replicatedDbName); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .run("create table t4 as select * from t3") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3"), + new Path(tuple.dumpLocation, FILE_NAME)); + + // _bootstrap directory should be created as bootstrap enabled on external tables. + Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + + // _bootstrap//t2 + // _bootstrap//t3 + Path dbPath = new Path(dumpPath, primaryDbName); + Path tblPath = new Path(dbPath, "t2"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + tblPath = new Path(dbPath, "t3"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .verifyReplTargetProperty(replicatedDbName); + + // Ckpt should be set on bootstrapped tables. + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + + // Drop source tables to see if target points to correct data or not after bootstrap load. + primary.run("use " + primaryDbName) + .run("drop table t2") + .run("drop table t3"); + + // Create table event for t4 should be applied along with bootstrapping of t2 and t3 + replica.run("use " + replicatedDbName) + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select place from t2 where country = 'france'") + .verifyResult("paris") + .run("select id from t3 order by id") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t4 order by id") + .verifyResults(Arrays.asList("10", "20")) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + List loadWithClause = externalTableBasePathWithClause(); + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause); + + // Insert a row into "t1" and create another external table using data from "t1". + primary.run("use " + primaryDbName) + .run("insert into table t1 values (2)") + .run("create external table t2 as select * from t1"); + + // Inject a behavior so that getTable returns null for table "t1". This ensures the table is + // skipped for data files listing. + BehaviourInjection tableNuller = new BehaviourInjection() { + @Nullable + @Override + public Table apply(@Nullable Table table) { + LOG.info("Performing injection on table " + table.getTableName()); + if (table.getTableName().equalsIgnoreCase("t1")){ + injectionPathCalled = true; + return null; + } else { + nonInjectedPathCalled = true; + return table; + } + } + }; + InjectableBehaviourObjectStore.setGetTableBehaviour(tableNuller); + WarehouseInstance.Tuple tupleInc; + try { + // The t1 table will be skipped from data location listing. + tupleInc = primary.dump(primaryDbName, tupleBootstrap.lastReplicationId, dumpWithClause); + tableNuller.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour + } + + // Only table t2 should exist in the data location list file. + assertFalseExternalFileInfo(new Path(tupleInc.dumpLocation, FILE_NAME)); + + // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have + // inserted data. + replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select id from t1 order by id") + .verifyResult(null) + .run("select id from t2 order by id") + .verifyResults(new String[]{}); + } + + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // This looks like an empty dump but it has the ALTER TABLE event created by the previous + // dump. We need it here so that the next dump won't have any events. + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(incTuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb) + .run("create table tbl (fld int)") + .run("use " + primaryDbName) + .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause); + Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(), + Long.valueOf(inc2Tuple.lastReplicationId).longValue()); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(inc2Tuple.lastReplicationId); + } + + private List externalTableBasePathWithClause() throws IOException, SemanticException { + return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + } + + private void assertFalseExternalFileInfo(Path externalTableInfoFile) + throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Assert.assertFalse(fileSystem.exists(externalTableInfoFile)); + } + + private void assertExternalFileInfo(List expected, Path externalTableInfoFile) + throws IOException { + ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 1815824b5f..0215a39c79 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 622433bb10..5b21b918b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -172,7 +172,8 @@ private boolean shouldExamineTablesToDump() { */ private boolean shouldDumpExternalTableLocation() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); } /** @@ -493,7 +494,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) Exception caught = null; boolean shouldWriteExternalTableLocationInfo = conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); try (Writer writer = new Writer(dbRoot, conf)) { for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 4c504be894..c7aa0077a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -106,7 +106,8 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path this.hiveConf = hiveConf; writePath = new Path(dbRoot, FILE_NAME); includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); - dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); if (shouldWrite()) { this.writer = FileSystem.get(hiveConf).create(writePath); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 276f759a7e..810a4c5284 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -49,12 +49,12 @@ import java.util.List; import java.util.Map; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; +import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index d01e24c385..f9648c8961 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -57,7 +57,7 @@ public HiveWrapper(Hive db, String dbName, long lastReplId) { public Tuple table(final String tableName, HiveConf conf) throws HiveException { // Column statistics won't be accurate if we are dumping only metadata - boolean getColStats = !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + boolean getColStats = !Utils.shouldDumpMetaDataOnly(db.getTable(dbName, tableName), conf); return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName, true, false, getColStats)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 01b7fdc4b6..3d64c7b385 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -72,8 +73,8 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication ? null : tableSpec; this.replicationSpec = replicationSpec; - if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || - (this.tableSpec != null && this.tableSpec.tableHandle.isView())) { + if (this.tableSpec != null && this.tableSpec.tableHandle.isView() || + Utils.shouldDumpMetaDataOnly(tableSpec.tableHandle, conf)) { this.replicationSpec.setIsMetadataOnly(true); this.tableSpec.tableHandle.setStatsStateLikeNewTable(); @@ -92,7 +93,8 @@ public boolean write() throws SemanticException { } else if (shouldExport()) { PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); - if (!replicationSpec.isMetadataOnly()) { + if (!replicationSpec.isMetadataOnly() + && !tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE)) { writeData(withPartitions); } return true; @@ -158,10 +160,8 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - - // this is the data copy new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + .export(replicationSpec); } } catch (Exception e) { throw new SemanticException(e.getMessage(), e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bc9f06dfa9..6f8912b5f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -270,4 +271,11 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, return Collections.singletonList(fromPath); } } + + public static boolean shouldDumpMetaDataOnly(Table table, HiveConf conf) { + return conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && + table.getTableType().equals(TableType.EXTERNAL_TABLE) && + conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 0168240829..aedf69870a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -229,7 +229,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTableAfter, withinContext.hiveConf)) { qlMdTableAfter.setStatsStateLikeNewTable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 837d51c8c8..355374aeb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -19,7 +19,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.ql.metadata.Table; @@ -66,7 +65,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping data about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTable, withinContext.hiveConf)) { qlMdTable.setStatsStateLikeNewTable(); } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..41bb1f743c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -478,6 +478,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), + REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table", + false, + "Indicates whether external table replication dump only metadata information or data + metadata"), REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false, "Indicates if repl dump should bootstrap the information about ACID tables along with \n" + "incremental dump for replication. It is recommended to keep this config parameter \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java new file mode 100644 index 0000000000..f2d61ccf44 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * TestMetadataReplicationScenariosExternalTables - Test metadata only replication + * for external tables + */ +public class TestMetadataReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + String extraPrimaryDb; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestReplicationScenarios.class); + } + + @Before + public void setup() throws Throwable { + super.setup(); + extraPrimaryDb = "extra_" + primaryDbName; + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + extraPrimaryDb + " cascade"); + super.tearDown(); + } + + @Test + public void replicationWithoutExternalTables() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1"}) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2"}) + .verifyReplTargetProperty(replicatedDbName); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyFailure(new String[] {"t3"}) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void externalTableReplicationWithDefaultPaths() throws Throwable { + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump("repl dump " + primaryDbName); + + // verify that the external table info is not written as metadata only replication + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + List withClauseOptions = externalTableBasePathWithClause(); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult(null) + .run("select country from t2 where country = 'france'") + .verifyResult(null); + + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult(null) + .run("select id from t4") + .verifyResult(null); + + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + } + + @Test + public void externalTableReplicationWithCustomPaths() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // Create base directory but use HDFS path without schema or authority details. + // Hive should pick up the local cluster's HDFS schema/authority. + externalTableBasePathWithClause(); + List loadWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.update'=''" + ); + + WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) + .run("create external table a (i int, j int) " + + "row format delimited fields terminated by ',' " + + "location '" + externalTableLocation.toUri() + "'") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 'a'") + .verifyResults(Collections.singletonList("a")) + .run("select * From a").verifyResults(Collections.emptyList()); + + //externally add data to location + try (FSDataOutputStream outputStream = + fs.create(new Path(externalTableLocation, "file1.txt"))) { + outputStream.write("1,2\n".getBytes()); + outputStream.write("13,21\n".getBytes()); + } + + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("select i From a") + .verifyResults(new String[] {}) + .run("select j from a") + .verifyResults(new String[] {}); + + // alter table location to something new. + externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); + incrementalTuple = primary.run("use " + primaryDbName) + .run("alter table a set location '" + externalTableLocation + "'") + .dump(primaryDbName, incrementalTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select i From a") + .verifyResults(Collections.emptyList()); + } + + @Test + public void externalTableWithPartitions() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t2/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t2 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("insert into t2 partition(country='india') values ('bangalore')") + .dump("repl dump " + primaryDbName); + + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] {"t2"}) + .run("select place from t2") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // add new data externally, to a partition, but under the table level top directory + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select distinct(country) from t2") + .verifyResults(new String[] {}) + .run("select place from t2 where country='india'") + .verifyResults(new String[] {}) + .run("select place from t2 where country='australia'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Path customPartitionLocation = + new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france"); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // add new partitions to the table, at an external location than the table level directory + try (FSDataOutputStream outputStream = fs + .create(new Path(customPartitionLocation, "file.txt"))) { + outputStream.write("paris".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation + .toString() + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // change the location of the partition via alter command + String tmpLocation = "/tmp/" + System.nanoTime(); + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Changing location of the external table, should result in changes to the location of + // partition residing within the table location and not the partitions located outside. + String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2"; + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='france') values ('lyon')") + .run("alter table t2 set location '" + tmpLocation2 + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause); + } + + @Test + public void externalTableIncrementalReplication() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); + replica.load(replicatedDbName, tuple.dumpLocation); + + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("alter table t1 add partition(country='india')") + .run("alter table t1 add partition(country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + // Add new data externally, to a partition, but under the partition level top directory + // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("bangalore\n".getBytes()); + } + + List loadWithClause = externalTableBasePathWithClause(); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Delete one of the file and update another one. + fs.delete(new Path(partitionDir, "file.txt"), true); + fs.delete(new Path(partitionDir, "file1.txt"), true); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("chennai\n".getBytes()); + } + + // Repl load with zero events but external tables location info should present. + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Hive hive = Hive.get(replica.getConf()); + Set partitions = + hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1")); + List paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + + tuple = primary + .run("alter table t1 drop partition (country='india')") + .run("alter table t1 drop partition (country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("select * From t1") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + for (String path : paths) { + assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); + } + } + + @Test + public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2" }) + .verifyReplTargetProperty(replicatedDbName); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .run("create table t4 as select * from t3") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3"), + new Path(tuple.dumpLocation, FILE_NAME)); + + // _bootstrap directory should be created as bootstrap enabled on external tables. + Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + + // _bootstrap//t2 + // _bootstrap//t3 + Path dbPath = new Path(dumpPath, primaryDbName); + Path tblPath = new Path(dbPath, "t2"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + tblPath = new Path(dbPath, "t3"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .verifyReplTargetProperty(replicatedDbName); + + // Ckpt should be set on bootstrapped tables. + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + + // Drop source tables to see if target points to correct data or not after bootstrap load. + primary.run("use " + primaryDbName) + .run("drop table t2") + .run("drop table t3"); + + // Create table event for t4 should be applied along with bootstrapping of t2 and t3 + replica.run("use " + replicatedDbName) + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select place from t2 where country = 'france'") + .verifyResult("paris") + .run("select id from t3 order by id") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t4 order by id") + .verifyResults(Arrays.asList("10", "20")) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + List loadWithClause = externalTableBasePathWithClause(); + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause); + + // Insert a row into "t1" and create another external table using data from "t1". + primary.run("use " + primaryDbName) + .run("insert into table t1 values (2)") + .run("create external table t2 as select * from t1"); + + // Inject a behavior so that getTable returns null for table "t1". This ensures the table is + // skipped for data files listing. + BehaviourInjection tableNuller = new BehaviourInjection() { + @Nullable + @Override + public Table apply(@Nullable Table table) { + LOG.info("Performing injection on table " + table.getTableName()); + if (table.getTableName().equalsIgnoreCase("t1")){ + injectionPathCalled = true; + return null; + } else { + nonInjectedPathCalled = true; + return table; + } + } + }; + InjectableBehaviourObjectStore.setGetTableBehaviour(tableNuller); + WarehouseInstance.Tuple tupleInc; + try { + // The t1 table will be skipped from data location listing. + tupleInc = primary.dump(primaryDbName, tupleBootstrap.lastReplicationId, dumpWithClause); + tableNuller.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour + } + + // Only table t2 should exist in the data location list file. + assertFalseExternalFileInfo(new Path(tupleInc.dumpLocation, FILE_NAME)); + + // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have + // inserted data. + replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select id from t1 order by id") + .verifyResult(null) + .run("select id from t2 order by id") + .verifyResults(new String[]{}); + } + + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // This looks like an empty dump but it has the ALTER TABLE event created by the previous + // dump. We need it here so that the next dump won't have any events. + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(incTuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb) + .run("create table tbl (fld int)") + .run("use " + primaryDbName) + .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause); + Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(), + Long.valueOf(inc2Tuple.lastReplicationId).longValue()); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(inc2Tuple.lastReplicationId); + } + + private List externalTableBasePathWithClause() throws IOException, SemanticException { + return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + } + + private void assertFalseExternalFileInfo(Path externalTableInfoFile) + throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Assert.assertFalse(fileSystem.exists(externalTableInfoFile)); + } + + private void assertExternalFileInfo(List expected, Path externalTableInfoFile) + throws IOException { + ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 1815824b5f..0215a39c79 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 622433bb10..e92cd38007 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -172,7 +172,8 @@ private boolean shouldExamineTablesToDump() { */ private boolean shouldDumpExternalTableLocation() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); } /** @@ -493,7 +494,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) Exception caught = null; boolean shouldWriteExternalTableLocationInfo = conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); try (Writer writer = new Writer(dbRoot, conf)) { for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); @@ -583,6 +585,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + tuple.replicationSpec.setRepl(true); new TableExport( exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 4c504be894..c7aa0077a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -106,7 +106,8 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path this.hiveConf = hiveConf; writePath = new Path(dbRoot, FILE_NAME); includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); - dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); if (shouldWrite()) { this.writer = FileSystem.get(hiveConf).create(writePath); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 276f759a7e..810a4c5284 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -49,12 +49,12 @@ import java.util.List; import java.util.Map; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; +import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index ad3e55a169..8313d99c58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -51,6 +51,8 @@ private boolean isMigratingToTxnTable = false; private boolean isMigratingToExternalTable = false; private boolean needDupCopyCheck = false; + //Determine if replication is done using repl or export-import + private boolean isRepl = false; // Key definitions related to replication. public enum KEY { @@ -438,4 +440,12 @@ public void setNeedDupCopyCheck(boolean isFirstIncPending) { // Check HIVE-21197 for more detail. this.needDupCopyCheck = isFirstIncPending; } + + public boolean isRepl() { + return isRepl; + } + + public void setRepl(boolean repl) { + isRepl = repl; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index d01e24c385..f9648c8961 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -57,7 +57,7 @@ public HiveWrapper(Hive db, String dbName, long lastReplId) { public Tuple
table(final String tableName, HiveConf conf) throws HiveException { // Column statistics won't be accurate if we are dumping only metadata - boolean getColStats = !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + boolean getColStats = !Utils.shouldDumpMetaDataOnly(db.getTable(dbName, tableName), conf); return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName, true, false, getColStats)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 01b7fdc4b6..338a2f39f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -72,8 +73,8 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication ? null : tableSpec; this.replicationSpec = replicationSpec; - if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || - (this.tableSpec != null && this.tableSpec.tableHandle.isView())) { + if (this.tableSpec != null && this.tableSpec.tableHandle.isView() || + Utils.shouldDumpMetaDataOnly(tableSpec.tableHandle, conf)) { this.replicationSpec.setIsMetadataOnly(true); this.tableSpec.tableHandle.setStatsStateLikeNewTable(); @@ -92,7 +93,8 @@ public boolean write() throws SemanticException { } else if (shouldExport()) { PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); - if (!replicationSpec.isMetadataOnly()) { + if (!replicationSpec.isMetadataOnly() + && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { writeData(withPartitions); } return true; @@ -158,10 +160,8 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - - // this is the data copy new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + .export(replicationSpec); } } catch (Exception e) { throw new SemanticException(e.getMessage(), e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bc9f06dfa9..6f8912b5f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -270,4 +271,11 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, return Collections.singletonList(fromPath); } } + + public static boolean shouldDumpMetaDataOnly(Table table, HiveConf conf) { + return conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && + table.getTableType().equals(TableType.EXTERNAL_TABLE) && + conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 0168240829..aedf69870a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -229,7 +229,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTableAfter, withinContext.hiveConf)) { qlMdTableAfter.setStatsStateLikeNewTable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 837d51c8c8..355374aeb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -19,7 +19,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.ql.metadata.Table; @@ -66,7 +65,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping data about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTable, withinContext.hiveConf)) { qlMdTable.setStatsStateLikeNewTable(); }