diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 34b6737752..367501551a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -236,7 +236,8 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); - new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).write(); + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, distCpDoAsUser).write(); REPL_STATE_LOG.info( "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 86575e076d..67c028ef47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -74,7 +74,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); TableExport.AuthEntities authEntities = - new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).write(); + new TableExport(exportPaths, ts, replicationSpec, db, conf, null).write(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 87beffab63..d26d318db4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -52,14 +52,16 @@ Licensed to the Apache Software Foundation (ASF) under one private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; + private final String distCpDoAsUser; PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf hiveConf, - AuthEntities authEntities) { + AuthEntities authEntities, String distCpDoAsUser) { this.paths = paths; this.partitionIterable = partitionIterable; this.hiveConf = hiveConf; this.authEntities = authEntities; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); + this.distCpDoAsUser = distCpDoAsUser; this.queue = new ArrayBlockingQueue<>(2 * nThreads); } @@ -101,7 +103,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException try { // this the data copy Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(fromPath, rootDataDumpDir, hiveConf).export(forReplicationSpec); + new FileOperations(fromPath, rootDataDumpDir, hiveConf, distCpDoAsUser).export(forReplicationSpec); authEntities.inputs.add(new ReadEntity(partition)); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 5d7fd25f78..7d969304d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; @@ -45,17 +46,20 @@ Licensed to the Apache Software Foundation (ASF) under one import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; public class TableExport { - private TableSpec tableSpec; + private final static Logger logger = LoggerFactory.getLogger(TableExport.class); + private final ReplicationSpec replicationSpec; private final Hive db; private final HiveConf conf; - private final Logger logger; private final Paths paths; private final AuthEntities authEntities = new AuthEntities(); + private final String distCpDoAsUser; + + private TableSpec tableSpec; - public TableExport(Paths paths, TableSpec tableSpec, - ReplicationSpec replicationSpec, Hive db, HiveConf conf, Logger logger) - throws SemanticException { + public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, + HiveConf conf, String distCpDoAsUser) throws SemanticException { + this.distCpDoAsUser = distCpDoAsUser; this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -67,7 +71,6 @@ public TableExport(Paths paths, TableSpec tableSpec, } this.db = db; this.conf = conf; - this.logger = logger; this.paths = paths; } @@ -115,8 +118,7 @@ private PartitionIterable partitions() throws SemanticException { } } - private void writeMetaData(PartitionIterable partitions) - throws SemanticException { + private void writeMetaData(PartitionIterable partitions) throws SemanticException { try { EximUtil.createExportDump( paths.exportFileSystem, @@ -140,11 +142,13 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException( "partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, conf, authEntities).write(replicationSpec); + new PartitionExport(paths, partitions, conf, authEntities, distCpDoAsUser) + .write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); //this is the data copy - new FileOperations(fromPath, paths.dataExportDir(), conf).export(replicationSpec); + new FileOperations(fromPath, paths.dataExportDir(), conf, distCpDoAsUser) + .export(replicationSpec); authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); } authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java index 0cd3f17a4e..f977695499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java @@ -1,19 +1,19 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; @@ -37,19 +37,18 @@ class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); - private final HiveConf hiveConf; private final long maxCopyFileSize; private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; - CopyUtils(HiveConf hiveConf) { + CopyUtils(HiveConf hiveConf, String distCpDoAsUser) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); - this.copyAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + this.copyAsUser = distCpDoAsUser; } void doCopy(Path destination, List srcPaths) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index e1e3ae1412..2f4dc8cefb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -41,12 +41,15 @@ Licensed to the Apache Software Foundation (ASF) under one private final Path exportRootDataDir; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final String distCpDoAsUser; - public FileOperations(Path dataFileListPath, Path exportRootDataDir, HiveConf hiveConf) + public FileOperations(Path dataFileListPath, Path exportRootDataDir, HiveConf hiveConf, + String distCpDoAsUser) throws IOException { this.dataFileListPath = dataFileListPath; this.exportRootDataDir = exportRootDataDir; this.hiveConf = hiveConf; + this.distCpDoAsUser = distCpDoAsUser; dataFileSystem = dataFileListPath.getFileSystem(hiveConf); exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } @@ -69,7 +72,7 @@ private void copyFiles() throws IOException { for (FileStatus fileStatus : fileStatuses) { srcPaths.add(fileStatus.getPath()); } - new CopyUtils(hiveConf).doCopy(exportRootDataDir, srcPaths); + new CopyUtils(hiveConf, distCpDoAsUser).doCopy(exportRootDataDir, srcPaths); } /**