diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4fc0a93b61..2bfbebed60 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3857,7 +3857,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { * how this is desirable. * * As of HIVE-14993, WriteEntity with different WriteType must be considered different. - * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * So WriteEntity created in DDLTask cause extra output in golden files, but only because * DDLTask sets a different WriteType for the same Entity. * * In the spirit of bug-for-bug compatibility, this method ensures we only add new diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 91af814a0b..aba65918f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -52,6 +52,7 @@ protected int execute(DriverContext driverContext) { conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + work.acidPostProcess(db); TableExport tableExport = new TableExport( exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf ); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 7a74a606ad..551b198a53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -1278,11 +1278,18 @@ public TableSpec(Table table) { public TableSpec(Hive db, String tableName, Map partSpec) throws HiveException { + this(db, tableName, partSpec, false); + } + public TableSpec(Hive db, String tableName, Map partSpec, boolean allowPartialPartitionsSpec) + throws HiveException { Table table = db.getTable(tableName); tableHandle = table; this.tableName = table.getDbName() + "." + table.getTableName(); if (partSpec == null) { specType = SpecType.TABLE_ONLY; + } else if(allowPartialPartitionsSpec) { + partitions = db.getPartitions(table, partSpec); + specType = SpecType.STATIC_PARTITION; } else { Partition partition = db.getPartition(table, partSpec, false); if (partition == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index ef3e80d2f5..ed22308228 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -20,15 +20,22 @@ import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; +import javax.annotation.Nullable; +import java.util.Set; + /** * ExportSemanticAnalyzer. * @@ -41,6 +48,13 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { + rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + } + /** + * @param acidTableName - table name in . format; not NULL if exporting Acid table + */ + static Task analyzeExport(ASTNode ast, @Nullable String acidTableName, Hive db, + HiveConf conf, Set inputs, Set outputs) throws SemanticException { Tree tableTree = ast.getChild(0); Tree toTree = ast.getChild(1); @@ -94,9 +108,8 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { String exportRootDirName = tmpPath; // Configure export work ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); // Create an export task and add it as a root task - Task exportTask = TaskFactory.get(exportWork, conf); - rootTasks.add(exportTask); + return TaskFactory.get(exportWork, conf); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 78f83ef039..28ba8d6274 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -214,6 +214,9 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: + if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + return new UpdateDeleteSemanticAnalyzer(queryState); + } return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: return new ImportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index a660747e6a..db3ade19f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -31,25 +30,41 @@ import java.util.Set; import org.antlr.runtime.TokenRewriteStream; +import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; +import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -59,6 +74,7 @@ * updates and deletes instead. */ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class); private boolean useSuper = false; @@ -84,6 +100,9 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { case HiveParser.TOK_MERGE: analyzeMerge(tree); break; + case HiveParser.TOK_EXPORT: + analyzeAcidExport(tree); + break; default: throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "UpdateDeleteSemanticAnalyzer"); @@ -99,6 +118,169 @@ private boolean deleting() { return currentOperation == Context.Operation.DELETE; } + /** + * Exporting an Acid table is more complicated than a flat table. It may contains delete events, + * which can only be interpreted properly withing the context of the table/metastore where they + * were generated. It may also contain insert events that belong to transactions that aborted + * where the same constraints apply. + * In order to make the export artifact free of these constraints, the export does a + * insert into tmpTable select * from to filter/apply the events in current + * context and then export the tmpTable. This export artifact can now be imported into any + * table on any cluster (subject to schema checks etc). + * See {@link #analyzeAcidExport(ASTNode)} + * @param tree Export statement + * @return true if exporting an Acid table. + * @throws SemanticException + */ + public static boolean isAcidExport(ASTNode tree) throws SemanticException { + assert tree != null && tree.getToken() != null && + tree.getToken().getType() == HiveParser.TOK_EXPORT; + Tree tokTab = tree.getChild(0); + assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; + Table tableHandle = null; + try { + tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get()); + } + catch(HiveException ex) { + throw new SemanticException(ex); + } + return AcidUtils.isFullAcidTable(tableHandle); + } + /** + * See {@link #isAcidExport(ASTNode)} + * 1. create the temp table T + * 2. compile 'insert into T select * from acidTable' + * 3. compile 'export acidTable' (acidTable will be replaced with T during execution) + * 4. create task to drop T + */ + private void analyzeAcidExport(ASTNode ast) throws SemanticException { + //create staging table S + //generate new statement and compile + assert ast != null && ast.getToken() != null && + ast.getToken().getType() == HiveParser.TOK_EXPORT; + ASTNode tableTree = (ASTNode)ast.getChild(0); + assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; + ASTNode tokRefOrName = (ASTNode) tableTree.getChild(0); + Table tableHandle = getTargetTable(tokRefOrName); + assert AcidUtils.isFullAcidTable(tableHandle); + + //need to create the table "manually" rather than creating a task since it has to exist to + // compile the insert into T... + // maybe modify TableExport to take table name optionally - then create table could be the root task - not possible see above + //this is db.table + //todo: add a config prop which has the name of this DB. + String newTableName = Warehouse.getQualifiedName(tableHandle.getTTable()) + "_" + System.currentTimeMillis();//todo: this is not sufficiently unique + Map tblProps = new HashMap<>(); + tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, + false, false, null, + null, null, null, null, + tblProps, + true,//important so we get an exception on name collision + Warehouse.getQualifiedName(tableHandle.getTTable()), false); + //todo: should this pass in ReadEntity for parent Database to get the lock? + DDLTask createTableTask = (DDLTask) TaskFactory.get(new DDLWork(null, null, ctlt), conf); + createTableTask.setConf(conf);//above get() doesn't set it + Table newTable = null; + try { + //todo: ensure this doesn't get replicated; + //todo: this creates a table for EXPLAIN - perhaps do a drop if isExplainPlan...? + createTableTask.execute(new DriverContext(new Context(conf))); + newTable = db.getTable(newTableName);//todo: deal with duplicate table name + } + catch(IOException|HiveException ex) { + throw new SemanticException(ex); + } + List partCols = newTable.getPartCols(); + //now generate insert statement + //insert into newTableName select * from ts + StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrName)); + //todo: is addPartitionColsToSelect() needed? Does '*' put part cols last? + //todo: this needs some WHERE clause to only get partitons requested + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + rewrittenCtx.setIsUpdateDeleteMerge(false);//it's set in parseRewrittenQuery() + ASTNode rewrittenTree = rr.rewrittenTree; + try { + useSuper = true; + //newTable has to exist at this point to compile + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + //now we have the rootTasks set up for Insert ... Select + //now make an ExportTask from temp table + /*analyzeExport() creates TableSpec which in turn tries to build + "public List partitions" by looking in the metastore to find Partitions matching + the partition spec in the Export command. These of course don't exist yet since we've not ran the insert stmt yet!!!!!!! + + */ + Task exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, db, conf, inputs, outputs); + + /** + ConditionalTask ct = (ConditionalTask) rootTasks.get(0).getDependentTasks().get(0); + {@link ConditionalTask#addDependentTask(Task)} doesn't do the right thing right now. + todo: file bug + So let’s say, the tasks in the ConditionalTask are A,B,C, but they have children. + CondTask + |--A + |--A1 + |-A2 + |--B + |--B1 + |--C + |--C1 + + The way ConditionalTask.addDependent() is implemented, MyTask becomes a sibling of A1, + B1 and C1. So even if only 1 branch of ConditionalTask is executed (and parallel task + execution is enabled), there is no guarantee (as I see) that MyTask runs after A2 or + B1 or C1, which is really what is needed. + ct.addDependentTask(exportTask); + */ + addExportTask(rootTasks, exportTask); + { + /** + * Now make a task to drop temp table + * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) + * + * Make empty replication spec - since this is effectively a temp table we don't want to + * replicate anything - make empty replication spec + */ + ReplicationSpec replicationSpec = new ReplicationSpec(); + DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, + false, true, replicationSpec); + Task dropTask = + TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); + exportTask.addDependentTask(dropTask); + } + markReadEntityForUpdate(); + if(ctx.isExplainPlan()) { + try { + db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true); + } + catch(HiveException ex) { + LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex); + } + } + } + + /** + * Makes the exportTask run after all other tasks of the "insert into T ..." are done + */ + private void addExportTask(List> rootTasks, Task exportTask) { + for(Task t : rootTasks) { + if(t.getNumChild() <= 0) { + //this is a leaf so add exportTask to follow it + t.addDependentTask(exportTask); + } + else { + addExportTask(t.getDependentTasks(), exportTask); + } + } + } + private void analyzeUpdate(ASTNode tree) throws SemanticException { currentOperation = Context.Operation.UPDATE; reparseAndSuperAnalyze(tree); @@ -219,6 +401,9 @@ private ASTNode findLHSofAssignment(ASTNode assignment) { * @return the Metastore representation of the target table */ private Table getTargetTable(ASTNode tabRef) throws SemanticException { + return getTable(tabRef, db); + } + private static Table getTable(ASTNode tabRef, Hive db) throws SemanticException { String[] tableName; Table mTable; switch (tabRef.getType()) { @@ -581,7 +766,7 @@ private String getMatchedText(ASTNode n) { } /** * Here we take a Merge statement AST and generate a semantically equivalent multi-insert - * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible, + * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, * the new SQL statement is made to look like the input SQL statement so that it's easier to map * Query Compiler errors from generated SQL to original one this way. * The generated SQL is a complete representation of the original input for the same reason. diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 9093f48223..72ce79836c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,15 +17,20 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { + private Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; @@ -33,13 +38,18 @@ private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; + private String qualifiedTableName; + /** + * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg) { + String astRepresentationForErrorMsg, String qualifiedTable) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + this.qualifiedTableName = qualifiedTable; } public String getExportRootDir() { @@ -70,4 +80,18 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; } + /** + * For exporting Acid table, change the "pointer" to the temp table. + * This has to be done after the temp table is populated and all necessary Partition objects + * exist in the metastore. + * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)} + * for more info. + */ + public void acidPostProcess(Hive db) throws HiveException { + if(qualifiedTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + " using partSpec=" + tableSpec.partSpec); + tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java new file mode 100644 index 0000000000..0bdf5c7172 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])] TO + * 'export_target_path' [ FOR replication('eventid') ]; + *

+ * todo: where should temp table be located? Security/permissions. + * todo: explain plan creates temp table - should not + * todo: explain plan - in .q file? + * todo: should probably disable stats auto gather for the tmp table + * todo: hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict") - this is required + *

+ * todo: where is acid.version????? + * User Metadata: + * hive.acid.key.index=1,536870912,1; + * hive.acid.stats=2,0,0 + * hive.acid.version= + *

+ * //TODO: note that import from 'hdfs_exports_location/department'; is supported - what does + * // this do? see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport + * //import table imported_dept from 'hdfs_exports_location/department'; + */ +public class TestTxnExIm extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + } + + @Test + public void testExport() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + /** + * The update delete cause MergeFileTask to be executed + */ + @Test + public void testExportMerge() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + runStatementOnDriver("update T set b = 17 where a = 1"); + int[][] rows2 = {{1, 17}, {3, 4}}; + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs1); + } + + @Test + public void testExportPart() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");//todo: doc this requirement? + int[][] rows1 = {{1, 2, 1}, {3, 4, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int) stored as " + + "ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as ORC"); + runStatementOnDriver("insert into T partition(p)" + makeValuesClause(rows1)); + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + /* +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1519423568221/ +├── export +│   ├── _metadata +│   └── p=1 +│   └── delta_0000001_0000001_0000 +│   └── bucket_00000 +*/ + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + + @Test + public void testExportPartPartial() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int) " + + "stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) stored as " + + "ORC"); + runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1)); + + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1, 1}, {5, 6, 1, 2}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + /* Here is the layout we expect +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ +├── export +│   ├── _metadata +│   └── p=1 +│   ├── q=1 +│   │   └── 000002_0 +│   └── q=2 +│   └── 000001_0 +└── warehouse + ├── acidtbl + ├── acidtblpart + ├── nonacidnonbucket + ├── nonacidorctbl + ├── nonacidorctbl2 + ├── t + │   ├── p=1 + │   │   ├── q=1 + │   │   │   └── delta_0000001_0000001_0000 + │   │   │   ├── _orc_acid_version + │   │   │   └── bucket_00000 + │   │   └── q=2 + │   │   └── delta_0000001_0000001_0000 + │   │   ├── _orc_acid_version + │   │   └── bucket_00000 + │   └── p=2 + │   └── q=2 + │   └── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── timport + └── p=1 + ├── q=1 + │   └── 000002_0 + └── q=2 + └── 000001_0 + +23 directories, 11 files +*/ + } + + @Test + public void testExportBucketed() throws Exception { + int[][] rows1 = {{1, 2}, {1, 3}, {2, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(rows1)); + runStatementOnDriver("export table " + Table.ACIDTBL + " to '" + getTestDataDir() + + "/export'"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES ('transactional'='false')"); + + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + @Test + public void testCtasPartitioned() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas partitioned " + + "by (b int) stored as " + + "ORC TBLPROPERTIES ('transactional'='true') as select a, b from " + Table.NONACIDORCTBL); + int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate + Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support " + + "partitioning in the target table")); + } + + @Ignore + @Test + public void testCTLT() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T like " + Table.ACIDTBL + " TBLPROPERTIES " + + "('transactional'='true')"); +// runStatementOnDriver("create table T like " + Table.ACIDTBL); + List rs = runStatementOnDriver("show create table T"); + StringBuilder sb = new StringBuilder("*show create table"); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index a4df5099e1..7e84eef538 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -32,8 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -192,7 +190,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + - "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem makeing table acid in listener is too late + "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem making table acid in listener is too late rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); String expected2[][] = { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"}, @@ -234,7 +232,7 @@ public void testCtasEmpty() throws Exception { /** * Insert into unbucketed acid table from union all query - * Union All is flattend so nested subdirs are created and acid move drops them since + * Union All is flattened so nested subdirs are created and acid move drops them since * delta dirs have unique names */ @Test