diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 57ce1aae85..1e6ff11bc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -46,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.util.StringUtils; +import javax.security.auth.login.LoginException; + import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; public class ReplCopyTask extends Task implements Serializable { @@ -98,6 +101,18 @@ public int execute() { return 1; } } + // in case of acid tables, file is directly copied to destination. So we need to clear the old content, if + // its a replace (insert overwrite ) operation. + if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { + LOG.debug(" path " + toPath + " is cleaned before renaming"); + getHive().cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), + work.getIsAutoPurge()); + } + + if (!FileUtils.mkdir(dstFs, toPath, conf)) { + console.printError("Cannot make target directory: " + toPath.toString()); + return 2; + } List srcFiles = new ArrayList<>(); if (rwork.readSrcAsFilesList()) { @@ -108,11 +123,30 @@ public int execute() { LOG.debug("ReplCopyTask _files contains: {}", (srcFiles == null ? "null" : srcFiles.size())); } if ((srcFiles == null) || (srcFiles.isEmpty())) { - if (work.isErrorOnSrcEmpty()) { - console.printError("No _files entry found on source: " + fromPath.toString()); - return 5; - } else { + //For partitioned tables, search inside the top level path and launch distcp for each path + if (srcFs.exists(fromPath)) { + for (FileStatus fileStatus : srcFs.listStatus(fromPath)) { + List internalSrcFiles = filesInFileListing(srcFs, fileStatus.getPath()); + //If still src files is null or empty return error + if ((internalSrcFiles == null) || (internalSrcFiles.isEmpty())) { + if (work.isErrorOnSrcEmpty()) { + console.printError("No _files entry found on source: " + fromPath.toString()); + return 5; + } else { + return 0; + } + } + //Launch copy task to the partition path + launchCopy(internalSrcFiles, dstFs, rwork, fromPath, new Path(toPath, fileStatus.getPath().getName())); + } return 0; + } else { + if (work.isErrorOnSrcEmpty()) { + console.printError("No _files entry found on source: " + fromPath.toString()); + return 5; + } else { + return 0; + } } } } else { @@ -135,29 +169,7 @@ public int execute() { srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null)); } } - - LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); - - // in case of acid tables, file is directly copied to destination. So we need to clear the old content, if - // its a replace (insert overwrite ) operation. - if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { - LOG.debug(" path " + toPath + " is cleaned before renaming"); - getHive().cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), - work.getIsAutoPurge()); - } - - if (!FileUtils.mkdir(dstFs, toPath, conf)) { - console.printError("Cannot make target directory: " + toPath.toString()); - return 2; - } - // Copy the files from different source file systems to one destination directory - CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); - copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.readSrcAsFilesList(), work.isOverWrite()); - - // If a file is copied from CM path, then need to rename them using original source file name - // This is needed to avoid having duplicate files in target if same event is applied twice - // where the first event refers to source path and second event refers to CM path - copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); + launchCopy(srcFiles, dstFs, rwork, fromPath, toPath); return 0; } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); @@ -167,6 +179,20 @@ public int execute() { } } + private void launchCopy(List srcFiles, FileSystem dstFs, ReplCopyWork rwork, + Path fromPath, Path toPath) throws LoginException, HiveFatalException, IOException { + LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + + // Copy the files from different source file systems to one destination directory + CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); + copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.readSrcAsFilesList(), work.isOverWrite()); + + // If a file is copied from CM path, then need to rename them using original source file name + // This is needed to avoid having duplicate files in target if same event is applied twice + // where the first event refers to source path and second event refers to CM path + copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); + } + private List filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index bcbf20c53e..df653805d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -233,17 +233,32 @@ private void addConsolidatedPartitionDesc(AlterTableAddPartitionDesc lastAlterTa } private TaskTracker forNewTable() throws Exception { - // Place all partitions in single task to reduce load on HMS. + // Create one copy task for all partitions in the table if not metadata only or external table + addSingleCopyTaskForAllPartitions(); + // Place all partitions in a batch to reduce load on HMS. Add DDL task to add partition in a batch addConsolidatedPartitionDesc(null); return tracker; } - private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc) - throws Exception { - boolean processingComplete = addTasksForPartition(table, addPartitionDesc, null); - //If processing is not complete, means replication state is already updated with copy tasks which need - //to be processed - if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) { + private void addSingleCopyTaskForAllPartitions() throws MetaException, HiveException { + if (!isMetaDataOp() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + Path replicaWarehousePartitionLocation = managedLocationOnReplicaWarehouse(table); + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); + Task copyTask = ReplCopyTask.getLoadCopyTask( + event.replicationSpec(), + event.dataPath(), + replicaWarehousePartitionLocation, + context.hiveConf, copyAtLoad, false, (new Path(context.dumpDirectory)).getParent().toString(), + this.metricCollector + ); + tracker.addTask(copyTask); + } + } + + private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc){ + addTasksForPartition(table, addPartitionDesc); + //If processing is not complete, update replication state + if (hasMorePartitions && !tracker.canAddMoreTasks()) { ReplicationState currentReplicationState = new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); updateReplicationState(currentReplicationState); @@ -251,11 +266,9 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc } /** - * returns the root task for adding all partitions in a batch + * returns the task for adding all partitions in a batch */ - private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, - AlterTableAddPartitionDesc.PartitionDesc lastPartSpec) - throws MetaException, HiveException { + private void addTasksForPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc) { Task addPartTask = TaskFactory.get( new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc, true, (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector), @@ -264,43 +277,10 @@ private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc add //checkpointing task already added as part of add batch of partition if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { tracker.addTask(addPartTask); - return true; - } - //Add Copy task for all partitions - boolean lastProcessedStageFound = false; - for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) { - if (!tracker.canAddMoreTasks()) { - //update replication state with the copy task added with which it needs to proceed next - ReplicationState currentReplicationState = - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc, - partSpec, PartitionState.Stage.COPY)); - updateReplicationState(currentReplicationState); - return false; - } - Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); - partSpec.setLocation(replicaWarehousePartitionLocation.toString()); - LOG.debug("adding dependent CopyWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - if (!lastProcessedStageFound && lastPartSpec != null && - lastPartSpec.getLocation() != partSpec.getLocation()) { - //Don't process copy task if already processed as part of previous run - continue; - } - lastProcessedStageFound = true; - boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - Task copyTask = ReplCopyTask.getLoadCopyTask( - event.replicationSpec(), - new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())), - replicaWarehousePartitionLocation, - context.hiveConf, copyAtLoad, false, (new Path(context.dumpDirectory)).getParent().toString(), - this.metricCollector - ); - tracker.addTask(copyTask); + } else { + //add partition metadata task once the copy task is done + tracker.addDependentTask(addPartTask); } - //add partition metadata task once all the copy tasks are added - tracker.addDependentTask(addPartTask); - return true; } /** @@ -334,6 +314,20 @@ private Path locationOnReplicaWarehouse(Table table, AlterTableAddPartitionDesc. } } + private Path managedLocationOnReplicaWarehouse(Table table) + throws MetaException, HiveException { + if (tableDesc.getLocation() == null) { + if (table.getDataLocation() == null) { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + return context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName(), tableDesc.isExternal()); + } else { + return table.getDataLocation(); + } + } else { + return new Path(tableDesc.getLocation()); + } + } + private Task dropPartitionTask(Table table, Map partSpec) throws SemanticException { Task dropPtnTask = null; Map> partSpecsExpr = @@ -364,11 +358,6 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } - //Add Copy task pending for previous partition - if (PartitionState.Stage.COPY.equals(lastReplicatedStage)) { - addTasksForPartition(table, lastPartitionReplicated, - lastReplicatedPartitionDesc); - } boolean pendingPartitions = false; while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { pendingPartitions = true; @@ -396,6 +385,11 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep } } if (pendingPartitions) { + if (lastPartitionReplicated == null) { + //Copy task not added yet + addSingleCopyTaskForAllPartitions(); + } + //Add metadata tasks addConsolidatedPartitionDesc(lastPartitionReplicated); } return tracker;