diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java new file mode 100644 index 0000000..bb45f30 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -0,0 +1,69 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class ExportTask extends Task implements Serializable { + + private static final long serialVersionUID = 1L; + private Logger LOG = LoggerFactory.getLogger(ExportTask.class); + + public ExportTask() { + super(); + } + + @Override + public String getName() { + return "EXPORT"; + } + + @Override + protected int execute(DriverContext driverContext) { + try { + // Also creates the root directory + TableExport.Paths exportPaths = + new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), + conf, false); + Hive db = getHive(); + LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf) + .write(); + } catch (Exception e) { + LOG.error("failed", e); + setException(e); + return 1; + } + return 0; + } + + @Override + public StageType getType() { + // TODO: Modify Thrift IDL to generate export stage if needed + return StageType.REPL_DUMP; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index fe9b624..e9c69d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -117,6 +118,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); + taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f9991d9..eade36f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -256,7 +256,7 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = - new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); + new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index b8c6ea9..f094805 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -21,9 +21,12 @@ Licensed to the Apache Software Foundation (ASF) under one import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.plan.ExportWork; /** * ExportSemanticAnalyzer. @@ -42,8 +45,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { ReplicationSpec replicationSpec; if (ast.getChildCount() > 2) { + // Replication case: export table to for replication replicationSpec = new ReplicationSpec((ASTNode) ast.getChild(2)); } else { + // Export case replicationSpec = new ReplicationSpec(); } if (replicationSpec.getCurrentReplicationState() == null) { @@ -78,12 +83,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // initialize export path String tmpPath = stripQuotes(toTree.getText()); - // All parsing is done, we're now good to start the export process. + // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = - new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); - TableExport.AuthEntities authEntities = - new TableExport(exportPaths, ts, replicationSpec, db, null, conf).write(); + new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); + TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); + TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); + String exportRootDirName = tmpPath; + // Configure export work + ExportWork exportWork = + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); + // Create an export task and add it as a root task + Task exportTask = TaskFactory.get(exportWork, conf); + rootTasks.add(exportTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 7e72f23..3f3c4c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -49,18 +49,16 @@ Licensed to the Apache Software Foundation (ASF) under one private final String distCpDoAsUser; private final HiveConf hiveConf; private final int nThreads; - private final AuthEntities authEntities; private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf, AuthEntities authEntities) { + HiveConf hiveConf) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; - this.authEntities = authEntities; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); } @@ -105,7 +103,6 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException Path rootDataDumpDir = paths.partitionExportDir(partitionName); new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf) .export(forReplicationSpec); - authEntities.inputs.add(new ReadEntity(partition)); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { throw new RuntimeException("Error while export of data files", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index ed43272..ab94ec5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -28,11 +28,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +59,6 @@ Licensed to the Apache Software Foundation (ASF) under one private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; - private final AuthEntities authEntities = new AuthEntities(); public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, HiveConf conf) @@ -77,7 +78,7 @@ public TableExport(Paths paths, TableSpec tableSpec, this.paths = paths; } - public AuthEntities write() throws SemanticException { + public void write() throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -85,18 +86,17 @@ public AuthEntities write() throws SemanticException { if (tableSpec.tableHandle.isView()) { replicationSpec.setIsMetadataOnly(true); } - PartitionIterable withPartitions = partitions(); + PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly()) { writeData(withPartitions); } } - return authEntities; } - private PartitionIterable partitions() throws SemanticException { + private PartitionIterable getPartitions() throws SemanticException { try { - if (tableSpec.tableHandle.isPartitioned()) { + if (tableSpec != null && tableSpec.tableHandle != null && tableSpec.tableHandle.isPartitioned()) { if (tableSpec.specType == TableSpec.SpecType.TABLE_ONLY) { // TABLE-ONLY, fetch partitions if regular export, don't if metadata-only if (replicationSpec.isMetadataOnly()) { @@ -144,19 +144,16 @@ private void writeData(PartitionIterable partitions) throws SemanticException { try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { - throw new IllegalStateException( - "partitions cannot be null for partitionTable :" + tableSpec.tableName); + throw new IllegalStateException("partitions cannot be null for partitionTable :" + + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf, authEntities) - .write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); - //this is the data copy + // this is the data copy new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf) - .export(replicationSpec); - authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); + .export(replicationSpec); } - authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf)); } catch (Exception e) { throw new SemanticException(e); } @@ -173,20 +170,22 @@ private boolean shouldExport() throws SemanticException { public static class Paths { private final String astRepresentationForErrorMsg; private final HiveConf conf; - public final Path exportRootDir; + private final Path exportRootDir; private final FileSystem exportFileSystem; + private boolean writeData = true; - public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, - HiveConf conf) throws SemanticException { + public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, + boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; + this.writeData = shouldWriteData; Path tableRoot = new Path(dbRoot, tblName); URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); validateTargetDir(exportRootDir); this.exportRootDir = new Path(exportRootDir); try { this.exportFileSystem = this.exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir)) { + if (!exportFileSystem.exists(this.exportRootDir) && writeData) { exportFileSystem.mkdirs(this.exportRootDir); } } catch (IOException e) { @@ -194,14 +193,15 @@ public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, } } - public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf) - throws SemanticException { + public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf, + boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + this.writeData = shouldWriteData; try { this.exportFileSystem = exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir)) { + if (!exportFileSystem.exists(this.exportRootDir) && writeData) { exportFileSystem.mkdirs(this.exportRootDir); } } catch (IOException e) { @@ -234,7 +234,7 @@ private Path metaDataExportFile() { * Partition's data export directory is created within the export semantics of partition. */ private Path dataExportDir() throws SemanticException { - return exportDir(new Path(exportRootDir, EximUtil.DATA_PATH_NAME)); + return exportDir(new Path(getExportRootDir(), EximUtil.DATA_PATH_NAME)); } /** @@ -267,6 +267,10 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { throw new SemanticException(astRepresentationForErrorMsg, e); } } + + public Path getExportRootDir() { + return exportRootDir; + } } public static class AuthEntities { @@ -278,4 +282,33 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { public final Set inputs = Collections.newSetFromMap(new ConcurrentHashMap<>()); public final Set outputs = new HashSet<>(); } + + public AuthEntities getAuthEntities() throws SemanticException { + AuthEntities authEntities = new AuthEntities(); + try { + // Return if metadata-only + if (replicationSpec.isMetadataOnly()) { + return authEntities; + } + PartitionIterable partitions = getPartitions(); + if (tableSpec != null) { + if (tableSpec.tableHandle.isPartitioned()) { + if (partitions == null) { + throw new IllegalStateException("partitions cannot be null for partitionTable :" + + tableSpec.tableName); + } + new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + for (Partition partition : partitions) { + authEntities.inputs.add(new ReadEntity(partition)); + } + } else { + authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); + } + } + authEntities.outputs.add(toWriteEntity(paths.getExportRootDir(), conf)); + } catch (Exception e) { + throw new SemanticException(e); + } + return authEntities; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java new file mode 100644 index 0000000..ac723b6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -0,0 +1,73 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.Explain; + +import java.io.Serializable; + +@Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ExportWork implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String exportRootDirName; + private TableSpec tableSpec; + private ReplicationSpec replicationSpec; + private String astRepresentationForErrorMsg; + + public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, + String astRepresentationForErrorMsg) { + this.exportRootDirName = exportRootDirName; + this.tableSpec = tableSpec; + this.replicationSpec = replicationSpec; + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + } + + public String getExportRootDir() { + return exportRootDirName; + } + + public TableSpec getTableSpec() { + return tableSpec; + } + + public void setTableSpec(TableSpec tableSpec) { + this.tableSpec = tableSpec; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + public String getAstRepresentationForErrorMsg() { + return astRepresentationForErrorMsg; + } + + public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + } + +} diff --git a/ql/src/test/results/clientnegative/authorization_uri_export.q.out b/ql/src/test/results/clientnegative/authorization_uri_export.q.out index 19c8115..f6ed948 100644 --- a/ql/src/test/results/clientnegative/authorization_uri_export.q.out +++ b/ql/src/test/results/clientnegative/authorization_uri_export.q.out @@ -9,4 +9,3 @@ POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@export_auth_uri #### A masked pattern was here #### -FAILED: SemanticException [Error 10320]: Error while performing IO operation Exception while writing out the local file diff --git a/ql/src/test/results/clientnegative/exim_12_nonnative_export.q.out b/ql/src/test/results/clientnegative/exim_12_nonnative_export.q.out index 5da4daa..bd73536 100644 --- a/ql/src/test/results/clientnegative/exim_12_nonnative_export.q.out +++ b/ql/src/test/results/clientnegative/exim_12_nonnative_export.q.out @@ -12,4 +12,8 @@ POSTHOOK: query: create table exim_department ( dep_id int comment "department i POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@exim_department -FAILED: SemanticException [Error 10121]: Export/Import cannot be done for a non-native table. +PREHOOK: query: export table exim_department to 'ql/test/data/exports/exim_department' +PREHOOK: type: EXPORT +PREHOOK: Input: default@exim_department +#### A masked pattern was here #### +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.ExportTask. Export/Import cannot be done for a non-native table.