diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4fc0a93b61..2bfbebed60 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3857,7 +3857,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { * how this is desirable. * * As of HIVE-14993, WriteEntity with different WriteType must be considered different. - * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * So WriteEntity created in DDLTask cause extra output in golden files, but only because * DDLTask sets a different WriteType for the same Entity. * * In the spirit of bug-for-bug compatibility, this method ensures we only add new diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 91af814a0b..d0af931814 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -52,6 +52,9 @@ protected int execute(DriverContext driverContext) { conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + //todo: I suspect this screws up Security since tableExport.getAuthEntities() from + // BaseSemanticAnalzyer.analyzeExport() will see empty TableSpec.partition list.... + work.acidPostProcess(db); TableExport tableExport = new TableExport( exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf ); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 7a74a606ad..222dc420d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -76,6 +77,8 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -1265,6 +1268,14 @@ private static String getUnionTypeStringFromAST(ASTNode typeNode) public static enum SpecType {TABLE_ONLY, STATIC_PARTITION, DYNAMIC_PARTITION}; public SpecType specType; + public List recomputePartitions(Hive db) throws HiveException { + assert specType == SpecType.STATIC_PARTITION : "unexpected specType: " + specType; + assert tableHandle != null && tableHandle.isPartitioned() : "expected partitioned table"; + assert partSpec != null && !partSpec.isEmpty() : "expected non null && not empty partSpec"; + partitions = db.getPartitions(tableHandle, partSpec); + return partitions; + } + public TableSpec(Hive db, HiveConf conf, ASTNode ast) throws SemanticException { this(db, conf, ast, true, false); @@ -1323,7 +1334,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit try { // get table metadata - tableName = getUnescapedName((ASTNode)ast.getChild(0)); + tableName = getUnescapedName((ASTNode) ast.getChild(0)); boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); if (testMode) { tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX) @@ -1341,12 +1352,22 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit throw new SemanticException(ErrorMsg.CANNOT_RETRIEVE_TABLE_METADATA.getMsg(ast .getChild(childIndex), e.getMessage()), e); } + handlePartitionSpec(db, conf, ast, allowDynamicPartitionsSpec, allowPartialPartitionsSpec); + } + public TableSpec(Hive db, HiveConf conf, Table tableHandle, ASTNode ast, boolean allowDynamicPartitionsSpec, + boolean allowPartialPartitionsSpec) throws SemanticException { + this.tableHandle = tableHandle; + this.tableName = Warehouse.getQualifiedName(tableHandle.getTTable()); + handlePartitionSpec(db, conf, ast, allowDynamicPartitionsSpec, allowPartialPartitionsSpec); + } + private void handlePartitionSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartitionsSpec, + boolean allowPartialPartitionsSpec) throws SemanticException { // get partition metadata if partition specified if (ast.getChildCount() == 2 && ast.getToken().getType() != HiveParser.TOK_CREATETABLE && ast.getToken().getType() != HiveParser.TOK_CREATE_MATERIALIZED_VIEW && ast.getToken().getType() != HiveParser.TOK_ALTER_MATERIALIZED_VIEW) { - childIndex = 1; + int childIndex = 1; ASTNode partspec = (ASTNode) ast.getChild(1); partitions = new ArrayList(); // partSpec is a mapping from partition column name to its value. @@ -2099,4 +2120,69 @@ public CacheUsage getCacheUsage() { public void setCacheUsage(CacheUsage cacheUsage) { this.cacheUsage = cacheUsage; } + + final Task analyzeExport(ASTNode ast, Table newTable) throws SemanticException { + Tree tableTree = ast.getChild(0); + Tree toTree = ast.getChild(1); + + ReplicationSpec replicationSpec; + if (ast.getChildCount() > 2) { + // Replication case: export table to for replication + replicationSpec = new ReplicationSpec((ASTNode) ast.getChild(2)); + } else { + // Export case + replicationSpec = new ReplicationSpec(); + } + if (replicationSpec.getCurrentReplicationState() == null) { + try { + long currentEventId = db.getMSC().getCurrentNotificationEventId().getEventId(); + replicationSpec.setCurrentReplicationState(String.valueOf(currentEventId)); + } catch (Exception e) { + throw new SemanticException("Error when getting current notification event ID", e); + } + } + + // initialize source table/partition + TableSpec ts; + + try { + if(newTable != null) { + ts = new TableSpec(db, conf, newTable, (ASTNode) tableTree, false, true); + } + else { + ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); + } + + } catch (SemanticException sme){ + if ((replicationSpec.isInReplicationScope()) && + ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException) + ) + ){ + // If we're in replication scope, it's possible that we're running the export long after + // the table was dropped, so the table not existing currently or being a different kind of + // table is not an error - it simply means we should no-op, and let a future export + // capture the appropriate state + ts = null; + } else { + throw sme; + } + } + + // initialize export path + String tmpPath = stripQuotes(toTree.getText()); + // All parsing is done, we're now good to start the export process + TableExport.Paths exportPaths = + new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); + TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); + TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + inputs.addAll(authEntities.inputs); + outputs.addAll(authEntities.outputs); + String exportRootDirName = tmpPath; + // Configure export work + ExportWork exportWork = + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), newTable != null); + // Create an export task and add it as a root task + return TaskFactory.get(exportWork, conf); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index ef3e80d2f5..ec33726db9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,15 +19,7 @@ package org.apache.hadoop.hive.ql.parse; -import org.antlr.runtime.tree.Tree; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; -import org.apache.hadoop.hive.ql.plan.ExportWork; /** * ExportSemanticAnalyzer. @@ -41,62 +33,6 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - Tree tableTree = ast.getChild(0); - Tree toTree = ast.getChild(1); - - ReplicationSpec replicationSpec; - if (ast.getChildCount() > 2) { - // Replication case: export table to for replication - replicationSpec = new ReplicationSpec((ASTNode) ast.getChild(2)); - } else { - // Export case - replicationSpec = new ReplicationSpec(); - } - if (replicationSpec.getCurrentReplicationState() == null) { - try { - long currentEventId = db.getMSC().getCurrentNotificationEventId().getEventId(); - replicationSpec.setCurrentReplicationState(String.valueOf(currentEventId)); - } catch (Exception e) { - throw new SemanticException("Error when getting current notification event ID", e); - } - } - - // initialize source table/partition - TableSpec ts; - - try { - ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ - // If we're in replication scope, it's possible that we're running the export long after - // the table was dropped, so the table not existing currently or being a different kind of - // table is not an error - it simply means we should no-op, and let a future export - // capture the appropriate state - ts = null; - } else { - throw sme; - } - } - - // initialize export path - String tmpPath = stripQuotes(toTree.getText()); - // All parsing is done, we're now good to start the export process - TableExport.Paths exportPaths = - new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); - inputs.addAll(authEntities.inputs); - outputs.addAll(authEntities.outputs); - String exportRootDirName = tmpPath; - // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); - // Create an export task and add it as a root task - Task exportTask = TaskFactory.get(exportWork, conf); - rootTasks.add(exportTask); + rootTasks.add(analyzeExport(ast, null)); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 78f83ef039..28ba8d6274 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -214,6 +214,9 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: + if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + return new UpdateDeleteSemanticAnalyzer(queryState); + } return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: return new ImportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index a660747e6a..5a36d8e4dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -31,25 +29,40 @@ import java.util.Set; import org.antlr.runtime.TokenRewriteStream; +import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -59,6 +72,7 @@ * updates and deletes instead. */ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class); private boolean useSuper = false; @@ -70,6 +84,7 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { if (useSuper) { super.analyzeInternal(tree); + postProcess(tree); } else { if (!getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); @@ -84,6 +99,9 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { case HiveParser.TOK_MERGE: analyzeMerge(tree); break; + case HiveParser.TOK_EXPORT: + analyzeAcidExport(tree); + break; default: throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "UpdateDeleteSemanticAnalyzer"); @@ -99,6 +117,148 @@ private boolean deleting() { return currentOperation == Context.Operation.DELETE; } + /** + * Exporting an Acid table is more complicated than a flat table. It may contains delete events, + * which can only be interpreted properly withing the context of the table/metastore where they + * were generated. It may also contain insert events that belong to transactions that aborted + * where the same constraints apply. + * In order to make the export artifact free of these constraints, the export does a + * insert into tmpTable select * from to filter/apply the events in current + * context and then export the tmpTable. This export artifact can now be imported into any + * table on any cluster (subject to schema checks etc). + * See {@link #analyzeAcidExport(ASTNode)} + * @param tree Export statement + * @return true if exporting an Acid table. + * @throws SemanticException + */ + public static boolean isAcidExport(ASTNode tree) throws SemanticException { + assert tree != null && tree.getToken() != null && + tree.getToken().getType() == HiveParser.TOK_EXPORT; + Tree tokTab = tree.getChild(0); + assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; + Table tableHandle = null; + try { + tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get()); + } + catch(HiveException ex) { + throw new SemanticException(ex); + } + return AcidUtils.isFullAcidTable(tableHandle); + } + /** + * See {@link #isAcidExport(ASTNode)} + */ + private void analyzeAcidExport(ASTNode ast) throws SemanticException { + //create staging table S + //generate new statement and compile + assert ast != null && ast.getToken() != null && + ast.getToken().getType() == HiveParser.TOK_EXPORT; + ASTNode tableTree = (ASTNode)ast.getChild(0); + assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; + ASTNode tokRefOrName = (ASTNode) tableTree.getChild(0); + Table tableHandle = getTargetTable(tokRefOrName); + assert AcidUtils.isFullAcidTable(tableHandle); + + //need to create the table "manually" rather than creating a task since it has to exist to compile + // maybe modify TableExport to take table name optionally - then create table could be the root task - not possible see above + //this is db.table - maybe need special DB where everyone has rights to create/drop tables //hmm this seems to be just table rather than db.table + //todo: add a config prop which has the name of this DB. + String newTableName = Warehouse.getQualifiedName(tableHandle.getTTable()) + "_" + System.currentTimeMillis();//todo: this is not sufficiently unique + Map tblProps = new HashMap<>(); + tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, + false, false, null, + null, null, null, null, + tblProps, + true,//important so we get an exception on name collision + Warehouse.getQualifiedName(tableHandle.getTTable()), false); + //todo: what's in getInputs/getOutputs??? + //should this pass in ReadEntity for parent Database to get the lock? + DDLTask createTableTask = (DDLTask) TaskFactory.get(new DDLWork(null, null, ctlt), conf); + createTableTask.setConf(conf);//above get() doesn't set it + Table newTable = null; + try { + //todo: ensure this doesn't get replicated; + //todo: this creates a table for EXPLAIN + createTableTask.execute(new DriverContext(new Context(conf))); + newTable = db.getTable(newTableName);//todo: deal with duplicate table name + } + catch(IOException|HiveException ex) { + throw new SemanticException(ex); + } + List partCols = newTable.getPartCols(); + + //now generate insert statement + //insert into newTableName select * from ts + StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrName)); + //todo: is addPartitionColsToSelect() needed? Does '*' put part cols last? + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + rewrittenCtx.setIsUpdateDeleteMerge(false);//it's set in parseRewrittenQuery() + ASTNode rewrittenTree = rr.rewrittenTree; + try { + useSuper = true; + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + //now we have the rootTasks set up for Insert ... Select + //now make an ExportTask from temp table + /*analyzeExport() creates TableSpec which in turn tries to build + "public List partitions" by looking in the metastore to find Partitions matching + the partition spec in the Export command. These of course don't exist yet since we've not ran the insert stmt yet!!!!!!! + + */ + Task exportTask = analyzeExport(ast, newTable); + ConditionalTask ct = (ConditionalTask) rootTasks.get(0).getDependentTasks().get(0); + /** + {@link ConditionalTask#addDependentTask(Task)} doesn't do the right thing right now. + todo: file bug + So let’s say, the tasks in the ConditionalTask are A,B,C, but they have children. + CondTask + |--A + |--A1 + |-A2 + |--B + |--B1 + |--C + |--C1 + + The way ConditionalTask.addDependent() is implemented, MyTask becomes a sibling of A1, + B1 and C1. So even if only 1 branch of ConditionalTask is executed (and parallel task + execution is enabled), there is no guarantee (as I see) that MyTask runs after A2 or + B1 or C1, which is really what is needed. + */ + ct.addDependentTask(exportTask); + { + /** + * Now make a task to drop temp table + * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) + * + * Make empty replication spec - since this is effectively a temp table we don't want to + * replicate anything - make empty replication spec + */ + ReplicationSpec replicationSpec = new ReplicationSpec(); + DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, + false, true, replicationSpec); + Task dropTask = + TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); + exportTask.addDependentTask(dropTask); + } + markReadEntityForUpdate(); + } + private void postProcess(ASTNode tree) { + switch (tree.getToken().getType()) { + case HiveParser.TOK_EXPORT: + //now that staging table S is created and insert into as select* from S + //is fully compiled we need to add hooks (tasks) to actually export and then drop S + + break; + } + } + private void analyzeUpdate(ASTNode tree) throws SemanticException { currentOperation = Context.Operation.UPDATE; reparseAndSuperAnalyze(tree); @@ -219,6 +379,9 @@ private ASTNode findLHSofAssignment(ASTNode assignment) { * @return the Metastore representation of the target table */ private Table getTargetTable(ASTNode tabRef) throws SemanticException { + return getTable(tabRef, db); + } + private static Table getTable(ASTNode tabRef, Hive db) throws SemanticException { String[] tableName; Table mTable; switch (tabRef.getType()) { @@ -581,7 +744,7 @@ private String getMatchedText(ASTNode n) { } /** * Here we take a Merge statement AST and generate a semantically equivalent multi-insert - * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible, + * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, * the new SQL statement is made to look like the input SQL statement so that it's easier to map * Query Compiler errors from generated SQL to original one this way. * The generated SQL is a complete representation of the original input for the same reason. diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java index 98da309094..178e3e2026 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java @@ -104,6 +104,7 @@ /** * ReadEntitites that are passed to the hooks. + * todo: why is this here? it's never read */ protected HashSet inputs; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 9093f48223..52c859b98d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain; @@ -33,13 +35,15 @@ private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; + private final boolean isAcidExport; public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg) { + String astRepresentationForErrorMsg, boolean isAcidExport) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + this.isAcidExport = isAcidExport; } public String getExportRootDir() { @@ -69,5 +73,9 @@ public String getAstRepresentationForErrorMsg() { public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; } - + public void acidPostProcess(Hive db) throws HiveException { + if(isAcidExport && tableSpec != null) { + tableSpec.recomputePartitions(db); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java new file mode 100644 index 0000000000..99d7feb3bc --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -0,0 +1,212 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.glassfish.jersey.message.internal.StringBuilderUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +public class TestTxnExIm extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + @Override + public void setUp() throws Exception { + super.setUp(); + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + } + + + @Test + public void testExport() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs); + +// "EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])]\n" + +// " TO 'export_target_path' [ FOR replication('eventid') ]"; + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + + //TODO: note that import from 'hdfs_exports_location/department'; is supported - what does + // this do? see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport + //import table imported_dept from 'hdfs_exports_location/department'; + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + /** + * todo: where should temp table be located? Security/permissions. + * todo: explain plan creates temp table - should not + * todo: explain plan - in .q file? + * todo: should probably disable stats auto gather for the tmp table + * + todo: where is acid.version????? + User Metadata: + hive.acid.key.index=1,536870912,1; + hive.acid.stats=2,0,0 + hive.acid.version= + + + * The update delete cause MergeFileTask to be executed + */ + @Test + public void testExportMerge() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + runStatementOnDriver("update T set b = 17 where a = 1"); + int[][] rows2 = {{1,17},{3,4}}; + List rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs); + +// "EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])]\n" + +// " TO 'export_target_path' [ FOR replication('eventid') ]"; + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + + //TODO: note that import from 'hdfs_exports_location/department'; is supported - what does this do? see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport + //import table imported_dept from 'hdfs_exports_location/department'; + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs1); + } + + @Test + public void testExportPart() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");//todo: doc this requirement? + int[][] rows1 = {{1,2,1},{3,4,2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as ORC"); + runStatementOnDriver("insert into T partition(p)" + makeValuesClause(rows1)); + // "EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])]\n" + + // " TO 'export_target_path' [ FOR replication('eventid') ]"; + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + /* +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1519423568221/ +├── export +│   ├── _metadata +│   └── p=1 +│   └── delta_0000001_0000001_0000 +│   └── bucket_00000 +*/ + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1,2,1}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + + //create table like + @Test + public void testCTLT() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T like " + Table.ACIDTBL + " TBLPROPERTIES ('transactional'='true')"); +// runStatementOnDriver("create table T like " + Table.ACIDTBL); + List rs = runStatementOnDriver("show create table T"); + StringBuilder sb = new StringBuilder("*show create table"); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + } + @Test + public void testExportBuckets() throws Exception { + int[][] rows1 = {{1,2},{2,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(rows1)); + runStatementOnDriver("export table " + Table.ACIDTBL + " to '" + getTestDataDir() + "/export'"); + int ti = 1; + /* + target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1518808273004/ +├── export +│   ├── _metadata +│   └── data +│   └── delta_0000013_0000013_0000 +│   ├── bucket_00000 +│   └── bucket_00001 +└── warehouse + ├── acidtbl + │   └── delta_0000013_0000013_0000 + │   ├── bucket_00000 + │   └── bucket_00001 + */ + } + @Test + public void testExportPartPartial() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1,2,1,1},{3,4,2,2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) stored as ORC"); + runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1)); + // "EXPORT TABLE tablename [PARTITION (part_column=\"value\"[, ...])]\n" + + // " TO 'export_target_path' [ FOR replication('eventid') ]"; + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("select * from T"); + /* + * target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1518808733048/ +├── export +│   ├── _metadata +│   └── p=1 +│   └── q=1 +│   └── delta_0000015_0000015_0000 +│   └── bucket_00000 +└── warehouse + ├── acidtbl + ├── acidtblpart + ├── nonacidnonbucket + ├── nonacidorctbl + ├── nonacidorctbl2 + └── t + ├── p=1 + │   └── q=1 + │   └── delta_0000015_0000015_0000 + │   └── bucket_00000 + └── p=2 + └── q=2 + └── delta_0000015_0000015_0000 + └── bucket_00000 +*/ + } + @Test + public void testCtasPartitioned() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); + runStatementOnDriver("create table myctas partitioned by (b int) stored as " + + "ORC TBLPROPERTIES ('transactional'='true') as select a, b from " + Table.NONACIDORCTBL); + int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index a4df5099e1..7e84eef538 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -32,8 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -192,7 +190,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + - "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem makeing table acid in listener is too late + "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem making table acid in listener is too late rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); String expected2[][] = { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"}, @@ -234,7 +232,7 @@ public void testCtasEmpty() throws Exception { /** * Insert into unbucketed acid table from union all query - * Union All is flattend so nested subdirs are created and acid move drops them since + * Union All is flattened so nested subdirs are created and acid move drops them since * delta dirs have unique names */ @Test