diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8cd401b364..f4712d342d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -637,6 +637,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.HOURS), "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " + "the policy instance will be marked as failed and will need manual intervention to restart."), + REPL_LOAD_PARTITIONS_BATCH_SIZE("hive.repl.load.partitions.batch.size", 10000, + "Provide the maximum number of partitions of a table that will be batched together during \n" + + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" + + "The data for these partitions will be copied before copying the metadata batch. "), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index f86f2ac2c0..182c436b07 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -524,6 +524,40 @@ public void externalTableWithPartitions() throws Throwable { assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); } + @Test + public void externalTableWithPartitionsInBatch() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t2/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE.varname + "'='" + 1 + "'"); + + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t2 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("insert into t2 partition(country='india') values ('bangalore')") + .run("insert into t2 partition(country='france') values ('paris')") + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, withClause); + + assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select distinct(country) from t2") + .verifyResults(new String[] { "india", "france", "australia" }) + .run("select place from t2") + .verifyResults(new String[] { "bangalore", "paris", "sydney" }) + .verifyReplTargetProperty(replicatedDbName); + + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + } + @Test public void externalTableIncrementalCheckpointing() throws Throwable { List withClause = ReplicationTestUtils.includeExternalTableClause(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 635fd6dac2..48c5e737ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -70,6 +70,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CHECKPOINT_KEY; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString; @@ -169,34 +170,51 @@ private boolean isMetaDataOp() { } /** - * Get all partitions and consolidate them into single partition request. + * Get all partitions in a batch and consolidate them into single partition request. * Also, copy relevant stats and other information from original request. * * @throws SemanticException */ private void addConsolidatedPartitionDesc() throws Exception { - List partitions = new LinkedList<>(); - for (AlterTableAddPartitionDesc alterTableAddPartitionDesc : event.partitionDescriptions(tableDesc)) { - - AlterTableAddPartitionDesc.PartitionDesc src = alterTableAddPartitionDesc.getPartitions().get(0); - - partitions.add(new AlterTableAddPartitionDesc.PartitionDesc( - src.getPartSpec(), src.getLocation(), src.getPartParams(), src.getInputFormat(), + //Load partitions equal to batch size at one go for metadata only and for external tables. + int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE); + int currentPartitionCount = 0; + List partitionDescs = event.partitionDescriptions(tableDesc); + int totalPartitionCount = partitionDescs.size(); + while (currentPartitionCount < totalPartitionCount) { + List partitions = new LinkedList<>(); + int pendingPartitionCount = totalPartitionCount - currentPartitionCount; + int toPartitionCount = currentPartitionCount + Math.min(pendingPartitionCount, maxTasks); + List partitionBatch = partitionDescs.subList(currentPartitionCount, + toPartitionCount); + for (AlterTableAddPartitionDesc addPartitionDesc : partitionBatch) { + AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0); + Map partParams = src.getPartParams(); + if (partParams == null) { + partParams = new HashMap<>(); + } + partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src); + partitions.add(new AlterTableAddPartitionDesc.PartitionDesc( + src.getPartSpec(), replicaWarehousePartitionLocation.toString(), partParams, src.getInputFormat(), src.getOutputFormat(), src.getNumBuckets(), src.getCols(), src.getSerializationLib(), src.getSerdeParams(), src.getBucketCols(), src.getSortCols(), src.getColStats(), src.getWriteId())); - } - AlterTableAddPartitionDesc consolidatedPartitionDesc = new AlterTableAddPartitionDesc(tableDesc.getDatabaseName(), + } + AlterTableAddPartitionDesc consolidatedPartitionDesc = new AlterTableAddPartitionDesc(tableDesc.getDatabaseName(), tableDesc.getTableName(), true, partitions); - addPartition(false, consolidatedPartitionDesc, null); - if (partitions.size() > 0) { - LOG.info("Added {} partitions", partitions.size()); + //don't need to add ckpt task separately. Added as part of add partition task + addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null); + if (partitions.size() > 0) { + LOG.info("Added {} partitions", partitions.size()); + } + currentPartitionCount = toPartitionCount; } } private TaskTracker forNewTable() throws Exception { - if (isMetaDataOp()) { + if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { // Place all partitions in single task to reduce load on HMS. addConsolidatedPartitionDesc(); return tracker; @@ -229,34 +247,12 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); - Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); - Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); - partSpec.setLocation(replicaWarehousePartitionLocation.toString()); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Task addPartTask = TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), - context.hiveConf + new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), + context.hiveConf ); - - Task ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - (HashMap)partSpec.getPartSpec(), - context.dumpDirectory, - context.hiveConf - ); - - boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly() - || (TableType.EXTERNAL_TABLE.equals(table.getTableType()) - ); - - if (isOnlyDDLOperation) { - // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for - // bootstrap, we skip current partition update. - addPartTask.addDependentTask(ckptTask); + //checkpointing task already added as part of add batch of partition in case for metadata only and external tables + if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { if (ptnRootTask == null) { ptnRootTask = addPartTask; } else { @@ -265,6 +261,20 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc return ptnRootTask; } + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); + Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); + partSpec.setLocation(replicaWarehousePartitionLocation.toString()); + LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + Task ckptTask = ReplUtils.getTableCheckpointTask( + tableDesc, + (HashMap)partSpec.getPartSpec(), + context.dumpDirectory, + context.hiveConf + ); + Path stagingDir = replicaWarehousePartitionLocation; // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType;