diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java new file mode 100644 index 0000000..bb45f30 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -0,0 +1,69 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class ExportTask extends Task implements Serializable { + + private static final long serialVersionUID = 1L; + private Logger LOG = LoggerFactory.getLogger(ExportTask.class); + + public ExportTask() { + super(); + } + + @Override + public String getName() { + return "EXPORT"; + } + + @Override + protected int execute(DriverContext driverContext) { + try { + // Also creates the root directory + TableExport.Paths exportPaths = + new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), + conf, false); + Hive db = getHive(); + LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf) + .write(); + } catch (Exception e) { + LOG.error("failed", e); + setException(e); + return 1; + } + return 0; + } + + @Override + public StageType getType() { + // TODO: Modify Thrift IDL to generate export stage if needed + return StageType.REPL_DUMP; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index fe9b624..e9c69d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -117,6 +118,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); + taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f9991d9..eade36f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -256,7 +256,7 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = - new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); + new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index b8c6ea9..f094805 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -21,9 +21,12 @@ Licensed to the Apache Software Foundation (ASF) under one import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.plan.ExportWork; /** * ExportSemanticAnalyzer. @@ -42,8 +45,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { ReplicationSpec replicationSpec; if (ast.getChildCount() > 2) { + // Replication case: export table to for replication replicationSpec = new ReplicationSpec((ASTNode) ast.getChild(2)); } else { + // Export case replicationSpec = new ReplicationSpec(); } if (replicationSpec.getCurrentReplicationState() == null) { @@ -78,12 +83,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // initialize export path String tmpPath = stripQuotes(toTree.getText()); - // All parsing is done, we're now good to start the export process. + // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = - new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); - TableExport.AuthEntities authEntities = - new TableExport(exportPaths, ts, replicationSpec, db, null, conf).write(); + new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); + TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); + TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); + String exportRootDirName = tmpPath; + // Configure export work + ExportWork exportWork = + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); + // Create an export task and add it as a root task + Task exportTask = TaskFactory.get(exportWork, conf); + rootTasks.add(exportTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index ed43272..fec783f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -28,11 +28,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +87,7 @@ public AuthEntities write() throws SemanticException { if (tableSpec.tableHandle.isView()) { replicationSpec.setIsMetadataOnly(true); } - PartitionIterable withPartitions = partitions(); + PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly()) { writeData(withPartitions); @@ -94,7 +96,7 @@ public AuthEntities write() throws SemanticException { return authEntities; } - private PartitionIterable partitions() throws SemanticException { + private PartitionIterable getPartitions() throws SemanticException { try { if (tableSpec.tableHandle.isPartitioned()) { if (tableSpec.specType == TableSpec.SpecType.TABLE_ONLY) { @@ -156,7 +158,7 @@ private void writeData(PartitionIterable partitions) throws SemanticException { .export(replicationSpec); authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); } - authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf)); + authEntities.outputs.add(toWriteEntity(paths.getExportRootDir(), conf)); } catch (Exception e) { throw new SemanticException(e); } @@ -173,20 +175,22 @@ private boolean shouldExport() throws SemanticException { public static class Paths { private final String astRepresentationForErrorMsg; private final HiveConf conf; - public final Path exportRootDir; + private final Path exportRootDir; private final FileSystem exportFileSystem; + private boolean writeData = true; - public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, - HiveConf conf) throws SemanticException { + public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, + boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; + this.writeData = shouldWriteData; Path tableRoot = new Path(dbRoot, tblName); URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); validateTargetDir(exportRootDir); this.exportRootDir = new Path(exportRootDir); try { this.exportFileSystem = this.exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir)) { + if (!exportFileSystem.exists(this.exportRootDir) && writeData) { exportFileSystem.mkdirs(this.exportRootDir); } } catch (IOException e) { @@ -194,14 +198,15 @@ public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, } } - public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf) - throws SemanticException { + public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf, + boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + this.writeData = shouldWriteData; try { this.exportFileSystem = exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir)) { + if (!exportFileSystem.exists(this.exportRootDir) && writeData) { exportFileSystem.mkdirs(this.exportRootDir); } } catch (IOException e) { @@ -234,7 +239,7 @@ private Path metaDataExportFile() { * Partition's data export directory is created within the export semantics of partition. */ private Path dataExportDir() throws SemanticException { - return exportDir(new Path(exportRootDir, EximUtil.DATA_PATH_NAME)); + return exportDir(new Path(getExportRootDir(), EximUtil.DATA_PATH_NAME)); } /** @@ -267,6 +272,10 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { throw new SemanticException(astRepresentationForErrorMsg, e); } } + + public Path getExportRootDir() { + return exportRootDir; + } } public static class AuthEntities { @@ -278,4 +287,27 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { public final Set inputs = Collections.newSetFromMap(new ConcurrentHashMap<>()); public final Set outputs = new HashSet<>(); } + + public AuthEntities getAuthEntities() throws SemanticException { + try { + PartitionIterable partitions = getPartitions(); + if (tableSpec.tableHandle.isPartitioned()) { + if (partitions == null) { + throw new IllegalStateException("partitions cannot be null for partitionTable :" + + tableSpec.tableName); + } + new PartitionExport(paths, partitions, distCpDoAsUser, conf, authEntities) + .write(replicationSpec); + for (Partition partition : partitions) { + authEntities.inputs.add(new ReadEntity(partition)); + } + } else { + authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); + } + authEntities.outputs.add(toWriteEntity(paths.getExportRootDir(), conf)); + } catch (Exception e) { + throw new SemanticException(e); + } + return authEntities; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java new file mode 100644 index 0000000..ac723b6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -0,0 +1,73 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.Explain; + +import java.io.Serializable; + +@Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ExportWork implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String exportRootDirName; + private TableSpec tableSpec; + private ReplicationSpec replicationSpec; + private String astRepresentationForErrorMsg; + + public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, + String astRepresentationForErrorMsg) { + this.exportRootDirName = exportRootDirName; + this.tableSpec = tableSpec; + this.replicationSpec = replicationSpec; + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + } + + public String getExportRootDir() { + return exportRootDirName; + } + + public TableSpec getTableSpec() { + return tableSpec; + } + + public void setTableSpec(TableSpec tableSpec) { + this.tableSpec = tableSpec; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + public String getAstRepresentationForErrorMsg() { + return astRepresentationForErrorMsg; + } + + public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + } + +}