diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index efecdb826c..d990625df4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.shims.ShimLoader; @@ -34,8 +37,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -54,14 +59,53 @@ List> tasks(TaskTracker tracker) { List> tasks = new ArrayList<>(); Iterator itr = work.getPathsToCopyIterator(); - while (tracker.canAddMoreTasks() && itr.hasNext()) { + int numTaskCanBeAdded = tracker.numTaskCanBeAdded(); + Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + while (numTaskCanBeAdded-- > 0 && itr.hasNext()) { DirCopyWork dirCopyWork = itr.next(); Task task = TaskFactory.get(dirCopyWork, conf); tasks.add(task); - tracker.addTask(task); + barrierTask.addDependentTask(task); LOG.debug("added task for {}", dirCopyWork); } - return tasks; + + if (!tasks.isEmpty()) { + tracker.addDependentTask(barrierTask); + tracker.addTaskList(tasks); + return Collections.singletonList(barrierTask); + } else { + return tasks; + } + } + + private static Integer setTargetPathOwnerInt(Path targetPath, Path sourcePath, HiveConf conf) throws IOException { + FileSystem targetFs = targetPath.getFileSystem(conf); + if (!targetFs.exists(targetPath)) { + targetFs.create(targetPath); + } + FileStatus status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); + if (status == null) { + throw new IOException("source path missing " + sourcePath); + } + targetPath.getFileSystem(conf).setOwner(targetPath, status.getOwner(), status.getGroup()); + return null; + } + + private static Integer setTargetPathOwner(Path targetPath, Path sourcePath, HiveConf conf, String distCpDoAsUser) + throws IOException { + if (distCpDoAsUser == null) { + return setTargetPathOwnerInt(targetPath, sourcePath, conf); + } + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + try { + Path finalTargetPath = targetPath; + Path finalSourcePath = sourcePath; + return proxyUser.doAs((PrivilegedExceptionAction) () -> + setTargetPathOwnerInt(finalTargetPath, finalSourcePath, conf)); + } catch (InterruptedException e) { + throw new IOException(e); + } } public static class DirCopyTask extends Task implements Serializable { @@ -86,6 +130,8 @@ protected int execute(DriverContext driverContext) { boolean usePrivilegedUser = distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser); + setTargetPathOwner(targetPath, sourcePath, conf, usePrivilegedUser ? distCpDoAsUser : null); + // do we create a new conf and only here provide this additional option so that we get away from // differences of data in two location for the same directories ? // basically add distcp.options.delete to hiveconf new object ? diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java index 1d01bc9cd2..37d92111d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java @@ -142,4 +142,9 @@ public void debugLog(String forEventType) { public int numberOfTasks() { return numberOfTasks; } -} \ No newline at end of file + + // number of task that can be added to the tracker. + public int numTaskCanBeAdded() { + return canAddMoreTasks() ? maxTasksAllowed - numberOfTasks : 0; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index d4fb1917b0..1f0b0797e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -275,8 +275,8 @@ public static boolean prepareImport(boolean isImportCmd, // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. ImportTableDesc tblDesc; + org.apache.hadoop.hive.metastore.api.Table tblObj = rv.getTable(); try { - org.apache.hadoop.hive.metastore.api.Table tblObj = rv.getTable(); // The table can be non acid in case of replication from a cluster with STRICT_MANAGED set to false. if (!TxnUtils.isTransactionalTable(tblObj) && replicationSpec.isInReplicationScope() && x.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && @@ -316,6 +316,7 @@ public static boolean prepareImport(boolean isImportCmd, } inReplicationScope = true; tblDesc.setReplWriteId(writeId); + tblDesc.setOwnerName(tblObj.getOwner()); } if (isExternalSet) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index c71ff6d713..4514af1f08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -115,6 +115,7 @@ // This is not needed beyond compilation, so it is transient. private transient FileSinkDesc writer; private Long replWriteId; // to be used by repl task to get the txn and valid write id list + private String ownerName = null; public CreateTableDesc() { } @@ -909,6 +910,10 @@ public Table toTable(HiveConf conf) throws HiveException { StatsSetupConst.FALSE); } } + + if (ownerName != null) { + tbl.setOwner(ownerName); + } return tbl; } @@ -939,4 +944,12 @@ public Long getReplWriteId() { public void setReplWriteId(Long replWriteId) { this.replWriteId = replWriteId; } + + public String getOwnerName() { + return ownerName; + } + + public void setOwnerName(String ownerName) { + this.ownerName = ownerName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java index 7130aba597..721a23419a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java @@ -70,6 +70,7 @@ private Map serdeProps; // only used for materialized views private Set tablesUsed; // only used for materialized views private ReplicationSpec replicationSpec = null; + private String ownerName = null; /** * For serialization only. @@ -419,4 +420,8 @@ public Table toTable(HiveConf conf) throws HiveException { return tbl; } + + public void setOwnerName(String ownerName) { + this.ownerName = ownerName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java index 5c30fca2d3..017e1c7f9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java @@ -366,4 +366,17 @@ public void setReplWriteId(Long replWriteId) { this.createTblDesc.setReplWriteId(replWriteId); } } + + public void setOwnerName(String ownerName) { + switch (getDescType()) { + case TABLE: + createTblDesc.setOwnerName(ownerName); + break; + case VIEW: + createViewDesc.setOwnerName(ownerName); + break; + default: + throw new RuntimeException("Invalid table type : " + getDescType()); + } + } }