diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 06efd02253..4fd8338747 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2065,6 +2065,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + "table there is at most 1 matching row in the source table per SQL Specification."), + TXN_EXPORT_TEMP_DB("hive.txn.export.temp.db", "default", + "Name of an existing database in Hive. Export of a transactional table will create and\n" + + "then drop a temporary table in this database. Every user executing such Export\n" + + "should have permissions to create/drop tables here. Make sure this database is not\n" + + "configured for replication."), + // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4fc0a93b61..2bfbebed60 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3857,7 +3857,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { * how this is desirable. * * As of HIVE-14993, WriteEntity with different WriteType must be considered different. - * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * So WriteEntity created in DDLTask cause extra output in golden files, but only because * DDLTask sets a different WriteType for the same Entity. * * In the spirit of bug-for-bug compatibility, this method ensures we only add new diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 91af814a0b..aba65918f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -52,6 +52,7 @@ protected int execute(DriverContext driverContext) { conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + work.acidPostProcess(db); TableExport tableExport = new TableExport( exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf ); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 7a74a606ad..551b198a53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -1278,11 +1278,18 @@ public TableSpec(Table table) { public TableSpec(Hive db, String tableName, Map partSpec) throws HiveException { + this(db, tableName, partSpec, false); + } + public TableSpec(Hive db, String tableName, Map partSpec, boolean allowPartialPartitionsSpec) + throws HiveException { Table table = db.getTable(tableName); tableHandle = table; this.tableName = table.getDbName() + "." + table.getTableName(); if (partSpec == null) { specType = SpecType.TABLE_ONLY; + } else if(allowPartialPartitionsSpec) { + partitions = db.getPartitions(table, partSpec); + specType = SpecType.STATIC_PARTITION; } else { Partition partition = db.getPartition(table, partSpec, false); if (partition == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index ef3e80d2f5..ed22308228 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -20,15 +20,22 @@ import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; +import javax.annotation.Nullable; +import java.util.Set; + /** * ExportSemanticAnalyzer. * @@ -41,6 +48,13 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { + rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + } + /** + * @param acidTableName - table name in . format; not NULL if exporting Acid table + */ + static Task analyzeExport(ASTNode ast, @Nullable String acidTableName, Hive db, + HiveConf conf, Set inputs, Set outputs) throws SemanticException { Tree tableTree = ast.getChild(0); Tree toTree = ast.getChild(1); @@ -94,9 +108,8 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { String exportRootDirName = tmpPath; // Configure export work ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); // Create an export task and add it as a root task - Task exportTask = TaskFactory.get(exportWork, conf); - rootTasks.add(exportTask); + return TaskFactory.get(exportWork, conf); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 78f83ef039..28ba8d6274 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -214,6 +214,9 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: + if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + return new UpdateDeleteSemanticAnalyzer(queryState); + } return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: return new ImportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index a660747e6a..77eb4d5f26 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -29,27 +28,46 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.antlr.runtime.TokenRewriteStream; +import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.json4s.DefaultWriters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -59,6 +77,7 @@ * updates and deletes instead. */ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class); private boolean useSuper = false; @@ -84,6 +103,9 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { case HiveParser.TOK_MERGE: analyzeMerge(tree); break; + case HiveParser.TOK_EXPORT: + analyzeAcidExport(tree); + break; default: throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "UpdateDeleteSemanticAnalyzer"); @@ -99,6 +121,215 @@ private boolean deleting() { return currentOperation == Context.Operation.DELETE; } + /** + * Exporting an Acid table is more complicated than a flat table. It may contains delete events, + * which can only be interpreted properly withing the context of the table/metastore where they + * were generated. It may also contain insert events that belong to transactions that aborted + * where the same constraints apply. + * In order to make the export artifact free of these constraints, the export does a + * insert into tmpTable select * from to filter/apply the events in current + * context and then export the tmpTable. This export artifact can now be imported into any + * table on any cluster (subject to schema checks etc). + * See {@link #analyzeAcidExport(ASTNode)} + * @param tree Export statement + * @return true if exporting an Acid table. + * @throws SemanticException + */ + public static boolean isAcidExport(ASTNode tree) throws SemanticException { + assert tree != null && tree.getToken() != null && + tree.getToken().getType() == HiveParser.TOK_EXPORT; + Tree tokTab = tree.getChild(0); + assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; + Table tableHandle = null; + try { + tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get()); + } + catch(HiveException ex) { + throw new SemanticException(ex); + } + return AcidUtils.isFullAcidTable(tableHandle); + } + private static String getTmptTableNameForExport(Table exportTable, HiveConf conf) { + String tmpTableDb = HiveConf.getVar(conf, HiveConf.ConfVars.TXN_EXPORT_TEMP_DB, "default"); + String tmpTableName = exportTable.getTableName() + "_" + + UUID.randomUUID().toString().replace('-', '_'); + return Warehouse.getQualifiedName(tmpTableDb, tmpTableName); + } + + /** + * See {@link #isAcidExport(ASTNode)} + * 1. create the temp table T + * 2. compile 'insert into T select * from acidTable' + * 3. compile 'export acidTable' (acidTable will be replaced with T during execution) + * 4. create task to drop T + */ + private void analyzeAcidExport(ASTNode ast) throws SemanticException { + assert ast != null && ast.getToken() != null && + ast.getToken().getType() == HiveParser.TOK_EXPORT; + ASTNode tableTree = (ASTNode)ast.getChild(0); + assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; + ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0); + Table exportTable = getTargetTable(tokRefOrNameExportTable); + assert AcidUtils.isFullAcidTable(exportTable); + + //need to create the table "manually" rather than creating a task since it has to exist to + // compile the insert into T... + String newTableName = getTmptTableNameForExport(exportTable, conf);//this is db.table + Map tblProps = new HashMap<>(); + tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, + false, false, null, + null, null, null, null, + tblProps, + true,//important so we get an exception on name collision + Warehouse.getQualifiedName(exportTable.getTTable()), false); + Table newTable; + try { + ReadEntity dbFroTmpTable = new ReadEntity(db.getDatabase( + HiveConf.getVar(conf, HiveConf.ConfVars.TXN_EXPORT_TEMP_DB, "default"))); + inputs.add(dbFroTmpTable);//so the plan knows we are 'reading' this db - locks, security... + DDLTask createTableTask = (DDLTask) TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf); + createTableTask.setConf(conf);//above get() doesn't set it + //ensure this doesn't get replicated - by making sure TXN_EXPORT_TEMP_DB isn't replicated + createTableTask.execute(new DriverContext(new Context(conf))); + newTable = db.getTable(newTableName); + } + catch(IOException|HiveException ex) { + throw new SemanticException(ex); + } + List partCols = newTable.getPartCols(); + //now generate insert statement + //insert into newTableName select * from ts + StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), + tokRefOrNameExportTable, tableTree, newTableName); + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + rewrittenCtx.setIsUpdateDeleteMerge(false);//it's set in parseRewrittenQuery() + ASTNode rewrittenTree = rr.rewrittenTree; + try { + useSuper = true; + //newTable has to exist at this point to compile + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + //now we have the rootTasks set up for Insert ... Select + removeStatsTasks(rootTasks); + //now make an ExportTask from temp table + /*analyzeExport() creates TableSpec which in turn tries to build + "public List partitions" by looking in the metastore to find Partitions matching + the partition spec in the Export command. These of course don't exist yet since we've not ran the insert stmt yet!!!!!!! + + */ + Task exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, + db, conf, inputs, outputs); + addExportTask(rootTasks, exportTask); + { + /** + * Now make a task to drop temp table + * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) + * + * Make empty replication spec - since this is effectively a temp table we don't want to + * replicate anything - make empty replication spec + */ + ReplicationSpec replicationSpec = new ReplicationSpec(); + DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, + false, true, replicationSpec); + Task dropTask = + TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); + exportTask.addDependentTask(dropTask); + } + markReadEntityForUpdate(); + if(ctx.isExplainPlan()) { + try { + //so that "explain" doesn't "leak" tmp tables + db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true); + } + catch(HiveException ex) { + LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex); + } + } + } + /** + * generate + * insert into newTableName select * from ts + * for EXPORT command + */ + private StringBuilder generateExportQuery(List partCols, + ASTNode tokRefOrNameExportTable, ASTNode tableTree, String newTableName) + throws SemanticException { + StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable)); + //builds partition spec so we can build suitable WHERE clause + TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true); + if(exportTableSpec.getPartSpec() != null) { + StringBuilder whereClause = null; + for(Map.Entry ent : exportTableSpec.getPartSpec().entrySet()) { + if(ent.getValue() == null) { + continue;//partial spec + } + if(whereClause == null) { + whereClause = new StringBuilder(" WHERE "); + } + if(whereClause.length() > " WHERE ".length()) { + whereClause.append(" AND "); + } + whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf)) + .append(" = ").append(ent.getValue()); + } + if(whereClause != null) { + rewrittenQueryStr.append(whereClause); + } + } + return rewrittenQueryStr; + } + /** + * Makes the exportTask run after all other tasks of the "insert into T ..." are done + */ + private void addExportTask(List> rootTasks, + Task exportTask) { + for(Task t : rootTasks) { + if(t.getNumChild() <= 0) { + //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978 + //this is a leaf so add exportTask to follow it + t.addDependentTask(exportTask); + } + else { + addExportTask(t.getDependentTasks(), exportTask); + } + } + } + private List> findStatsTasks( + List> rootTasks, List> statsTasks) { + for(Task t : rootTasks) { + if (t instanceof StatsTask) { + if(statsTasks == null) { + statsTasks = new ArrayList<>(); + } + statsTasks.add(t); + } + if(t.getDependentTasks() != null) { + statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks); + } + } + return statsTasks; + } + private void removeStatsTasks(List> rootTasks) { + List> statsTasks = findStatsTasks(rootTasks, null); + if(statsTasks == null) { + return; + } + for(Task statsTask : statsTasks) { + if(statsTask.getParentTasks() == null) { + continue;//should never happen + } + for(Task t : new ArrayList<>(statsTask.getParentTasks())) { + t.removeDependentTask(statsTask); + } + } + } private void analyzeUpdate(ASTNode tree) throws SemanticException { currentOperation = Context.Operation.UPDATE; reparseAndSuperAnalyze(tree); @@ -219,6 +450,9 @@ private ASTNode findLHSofAssignment(ASTNode assignment) { * @return the Metastore representation of the target table */ private Table getTargetTable(ASTNode tabRef) throws SemanticException { + return getTable(tabRef, db); + } + private static Table getTable(ASTNode tabRef, Hive db) throws SemanticException { String[] tableName; Table mTable; switch (tabRef.getType()) { @@ -300,6 +534,7 @@ private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, Strin throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg()); } rewrittenCtx.setExplainConfig(ctx.getExplainConfig()); + rewrittenCtx.setExplainPlan(ctx.isExplainPlan()); rewrittenCtx.setIsUpdateDeleteMerge(true); rewrittenCtx.setCmd(rewrittenQueryStr.toString()); @@ -581,7 +816,7 @@ private String getMatchedText(ASTNode n) { } /** * Here we take a Merge statement AST and generate a semantically equivalent multi-insert - * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible, + * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, * the new SQL statement is made to look like the input SQL statement so that it's easier to map * Query Compiler errors from generated SQL to original one this way. * The generated SQL is a complete representation of the original input for the same reason. diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 9093f48223..72ce79836c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,15 +17,20 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { + private Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; @@ -33,13 +38,18 @@ private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; + private String qualifiedTableName; + /** + * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg) { + String astRepresentationForErrorMsg, String qualifiedTable) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + this.qualifiedTableName = qualifiedTable; } public String getExportRootDir() { @@ -70,4 +80,18 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; } + /** + * For exporting Acid table, change the "pointer" to the temp table. + * This has to be done after the temp table is populated and all necessary Partition objects + * exist in the metastore. + * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)} + * for more info. + */ + public void acidPostProcess(Hive db) throws HiveException { + if(qualifiedTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + " using partSpec=" + tableSpec.partSpec); + tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java new file mode 100644 index 0000000000..3b3a2a6b6f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])] TO + * 'export_target_path' [ FOR replication('eventid') ]; + * todo: hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict") - this is required + * todo: where is acid.version????? + * User Metadata: + * hive.acid.key.index=1,536870912,1; + * hive.acid.stats=2,0,0 + * hive.acid.version= + *

+ * //TODO: note that import from 'hdfs_exports_location/department'; is supported - what does + * // this do? see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport + * //import table imported_dept from 'hdfs_exports_location/department'; + */ +public class TestTxnExIm extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + } + + /** + * simplest export test + */ + @Test + public void testExport() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + //verify data + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + /** + * The update delete cause MergeFileTask to be executed + */ + @Test + public void testExportMerge() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + runStatementOnDriver("update T set b = 17 where a = 1"); + int[][] rows2 = {{1, 17}, {3, 4}}; + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + //verify data + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs1); + } + + /** + * export partitioned table with full partition spec + */ + @Test + public void testExportPart() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");//todo: doc this requirement? + int[][] rows1 = {{1, 2, 1}, {3, 4, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int) stored as " + + "ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as ORC"); + runStatementOnDriver("insert into T partition(p)" + makeValuesClause(rows1)); + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + /* +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1519423568221/ +├── export +│   ├── _metadata +│   └── p=1 +│   └── delta_0000001_0000001_0000 +│   └── bucket_00000 +*/ + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + + /** + * Export partitioned table with partial partition spec + */ + @Test + public void testExportPartPartial() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int) " + + "stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) stored as " + + "ORC"); + runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1)); + + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1, 1}, {5, 6, 1, 2}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + /* Here is the layout we expect +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ +├── export +│   ├── _metadata +│   └── p=1 +│   ├── q=1 +│   │   └── 000002_0 +│   └── q=2 +│   └── 000001_0 +└── warehouse + ├── acidtbl + ├── acidtblpart + ├── nonacidnonbucket + ├── nonacidorctbl + ├── nonacidorctbl2 + ├── t + │   ├── p=1 + │   │   ├── q=1 + │   │   │   └── delta_0000001_0000001_0000 + │   │   │   ├── _orc_acid_version + │   │   │   └── bucket_00000 + │   │   └── q=2 + │   │   └── delta_0000001_0000001_0000 + │   │   ├── _orc_acid_version + │   │   └── bucket_00000 + │   └── p=2 + │   └── q=2 + │   └── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── timport + └── p=1 + ├── q=1 + │   └── 000002_0 + └── q=2 + └── 000001_0 + +23 directories, 11 files +*/ + } + + @Test + public void testExportBucketed() throws Exception { + int[][] rows1 = {{1, 2}, {1, 3}, {2, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(rows1)); + runStatementOnDriver("export table " + Table.ACIDTBL + " to '" + getTestDataDir() + + "/export'"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES ('transactional'='false')"); + + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + @Test + public void testCtasPartitioned() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas partitioned " + + "by (b int) stored as " + + "ORC TBLPROPERTIES ('transactional'='true') as select a, b from " + Table.NONACIDORCTBL); + int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate + Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support " + + "partitioning in the target table")); + } + + @Ignore + @Test + public void testCTLT() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T like " + Table.ACIDTBL + " TBLPROPERTIES " + + "('transactional'='true')"); +// runStatementOnDriver("create table T like " + Table.ACIDTBL); + List rs = runStatementOnDriver("show create table T"); + StringBuilder sb = new StringBuilder("*show create table"); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index a4df5099e1..7e84eef538 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -32,8 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -192,7 +190,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + - "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem makeing table acid in listener is too late + "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem making table acid in listener is too late rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); String expected2[][] = { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"}, @@ -234,7 +232,7 @@ public void testCtasEmpty() throws Exception { /** * Insert into unbucketed acid table from union all query - * Union All is flattend so nested subdirs are created and acid move drops them since + * Union All is flattened so nested subdirs are created and acid move drops them since * delta dirs have unique names */ @Test