diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index abd12c9a82..f291acd3d7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -478,6 +478,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), + REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY("hive.repl.dump.skip.immutable.data.copy", false, + "Indicates whether replication dump can skip copyTask and refer to \n" + + " original path instead. This would retain all table and partition meta"), REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table", false, "Indicates whether external table replication dump only metadata information or data + metadata"), diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 2eab45def8..1cdaa8fc38 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -54,6 +55,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Collections; import java.util.Map; @@ -175,6 +177,32 @@ public void testAcidTablesMoveOptimizationIncremental() throws Throwable { verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true); } + @Test + /** + * Testcase for getting immutable dataset dump, and its corresponding repl load. + */ + public void testAcidTablesBootstrapWithMetadataAlone() throws Throwable { + List withClauseOptions = new LinkedList<>(); + withClauseOptions.add("'hive.repl.dump.skip.immutable.data.copy'='true'"); + + prepareDataAndDump(primaryDbName, withClauseOptions); + replica.load(replicatedDbName, primaryDbName, withClauseOptions); + verifyAcidTableLoadWithoutData(replicatedDbName); + } + + private void verifyAcidTableLoadWithoutData(String replicatedDbName) throws Throwable { + replica.run("use " + replicatedDbName) + // no data should be there. + .run("select id from t1 order by id") + .verifyResults(new String[] {}) + // all 4 partitions should be there + .run("show partitions t2") + .verifyResults(new String[] {"country=france", "country=india", "country=italy", "country=us"}) + // no data should be there. + .run("select place from t2") + .verifyResults(new String[] {}); + } + @Test public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { int numTxns = 5; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 86f92338a6..b9cb45d2a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -33,6 +33,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -162,6 +163,9 @@ public void setResultValues(List resultValues) { } public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { + return Collections.emptyList(); + } List> tasks = new ArrayList<>(); while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) { DirCopyWork dirCopyWork = dirCopyIterator.next(); @@ -174,6 +178,9 @@ public void setResultValues(List resultValues) { } public List> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { + return Collections.emptyList(); + } List> tasks = new ArrayList<>(); while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index b36c4a531f..9e398f2a77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -60,9 +61,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; @@ -154,7 +158,45 @@ private ReplicationState initialReplicationState() throws SemanticException { ); } + private boolean isMetaDataOp() { + return HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY) || + HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_METADATA_ONLY); + } + + /** + * Get all partitions and consolidate them into single partition request. + * Also, copy relevant stats and other information from original request. + * + * @throws SemanticException + */ + private void addConsolidatedPartitionDesc() throws Exception { + List partitions = new LinkedList<>(); + for (AlterTableAddPartitionDesc alterTableAddPartitionDesc : event.partitionDescriptions(tableDesc)) { + + AlterTableAddPartitionDesc.PartitionDesc src = alterTableAddPartitionDesc.getPartitions().get(0); + + partitions.add(new AlterTableAddPartitionDesc.PartitionDesc( + src.getPartSpec(), src.getLocation(), src.getPartParams(), src.getInputFormat(), + src.getOutputFormat(), src.getNumBuckets(), src.getCols(), src.getSerializationLib(), + src.getSerdeParams(), src.getBucketCols(), src.getSortCols(), src.getColStats(), + src.getWriteId())); + } + AlterTableAddPartitionDesc consolidatedPartitionDesc = new AlterTableAddPartitionDesc(tableDesc.getDatabaseName(), + tableDesc.getTableName(), true, partitions); + + addPartition(false, consolidatedPartitionDesc, null); + if (partitions.size() > 0) { + LOG.info("Added {} partitions", partitions.size()); + } + } + private TaskTracker forNewTable() throws Exception { + if (isMetaDataOp()) { + // Place all partitions in single task to reduce load on HMS. + addConsolidatedPartitionDesc(); + return tracker; + } + Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();