diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index d26f0ccb17..779240b007 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -195,7 +195,6 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ metadata_only_queries.q,\ metadata_only_queries_with_filters.q,\ metadataonly1.q,\ - mm_conversions.q,\ mrr.q,\ nonmr_fetch_threshold.q,\ optimize_nullscan.q,\ @@ -582,6 +581,7 @@ minillaplocal.query.files=\ mapjoin_hint.q,\ mapjoin_emit_interval.q,\ mergejoin_3way.q,\ + mm_conversions.q,\ mm_exim.q,\ mrr.q,\ multiMapJoin1.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4acdd9b49b..1ab4e0bee7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -1412,8 +1413,9 @@ private void acquireLocks() throws CommandProcessorResponse { if(userFromUGI == null) { throw createProcessorResponse(10); } + // Set the table write id in all of the acid file sinks - if (haveAcidWrite()) { + if (!plan.getAcidSinks().isEmpty()) { List acidSinks = new ArrayList<>(plan.getAcidSinks()); //sorting makes tests easier to write since file names and ROW__IDs depend on statementId //so this makes (file name -> data) mapping stable @@ -1430,6 +1432,18 @@ private void acquireLocks() throws CommandProcessorResponse { desc.setStatementId(queryTxnMgr.getStmtIdAndIncrement()); } } + + // Note: the sinks and DDL cannot coexist at this time; but if they could we would + // need to make sure we don't get two write IDs for the same table. + DDLDescWithWriteId acidDdlDesc = plan.getAcidDdlDesc(); + if (acidDdlDesc != null && acidDdlDesc.mayNeedWriteId()) { + String fqTableName = acidDdlDesc.getFullTableName(); + long writeId = queryTxnMgr.getTableWriteId( + Utilities.getDatabaseName(fqTableName), Utilities.getTableName(fqTableName)); + acidDdlDesc.setWriteId(writeId); + } + + /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); @@ -1453,10 +1467,6 @@ private void acquireLocks() throws CommandProcessorResponse { } } - private boolean haveAcidWrite() { - return !plan.getAcidSinks().isEmpty(); - } - public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException { releaseLocksAndCommitOrRollback(commit, queryTxnMgr); } @@ -1883,7 +1893,7 @@ private boolean requiresLock() { return false; } // Lock operations themselves don't require the lock. - if (isExplicitLockOperation()){ + if (isExplicitLockOperation()) { return false; } if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) { @@ -2112,7 +2122,6 @@ private void execute() throws CommandProcessorResponse { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up while (driverCxt.isRunning()) { - // Launch upto maxthreads tasks Task task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index f53afaff2b..79e938aebd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -35,8 +35,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -50,6 +48,8 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.TableAccessInfo; +import org.apache.hadoop.hive.ql.plan.DDLDesc; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -61,8 +61,8 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * QueryPlan can be serialized to disk so that we can restart/resume the @@ -112,6 +112,7 @@ private final HiveOperation operation; private final boolean acidResourcesInQuery; private final Set acidSinks; // Note: both full-ACID and insert-only sinks. + private final DDLDesc.DDLDescWithWriteId acidDdlDesc; private Boolean autoCommitValue; public QueryPlan() { @@ -123,6 +124,7 @@ protected QueryPlan(HiveOperation command) { this.operation = command; this.acidResourcesInQuery = false; this.acidSinks = Collections.emptySet(); + this.acidDdlDesc = null; } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, @@ -151,8 +153,8 @@ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, S this.resultSchema = resultSchema; this.acidResourcesInQuery = sem.hasTransactionalInQuery(); this.acidSinks = sem.getAcidFileSinks(); + this.acidDdlDesc = sem.getAcidDdlDesc(); } - private static final Logger LOG = LoggerFactory.getLogger(QueryPlan.class); /** * @return true if any acid resources are read/written @@ -166,6 +168,11 @@ public boolean hasAcidResourcesInQuery() { Set getAcidSinks() { return acidSinks; } + + DDLDescWithWriteId getAcidDdlDesc() { + return acidDdlDesc; + } + public String getQueryStr() { return queryString; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index c8cb8a40b4..4e10649136 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -54,6 +54,7 @@ import java.util.concurrent.ExecutionException; import com.google.common.collect.ImmutableSet; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -148,6 +149,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.CheckConstraint; import org.apache.hadoop.hive.ql.metadata.CheckResult; import org.apache.hadoop.hive.ql.metadata.DefaultConstraint; @@ -4423,27 +4425,17 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I } } - private List> generateAddMmTasks(Table tbl) throws HiveException { + private List> generateAddMmTasks(Table tbl, Long writeId) throws HiveException { // We will move all the files in the table/partition directories into the first MM // directory, then commit the first write ID. List srcs = new ArrayList<>(), tgts = new ArrayList<>(); - long mmWriteId = 0; - try { - HiveTxnManager txnManager = getTxnMgr(); - if (txnManager.isTxnOpen()) { - mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName()); - } else { - txnManager.openTxn(new Context(conf), conf.getUser()); - mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName()); - txnManager.commitTxn(); - } - } catch (Exception e) { - String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); - console.printError(errorMessage, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (writeId == null) { + throw new HiveException("Internal error - write ID not set for MM conversion"); } + int stmtId = 0; - String mmDir = AcidUtils.deltaSubdir(mmWriteId, mmWriteId, stmtId); + String mmDir = AcidUtils.deltaSubdir(writeId, writeId, stmtId); + Hive db = getHive(); if (tbl.getPartitionKeys().size() > 0) { PartitionIterable parts = new PartitionIterable(db, tbl, null, @@ -4471,10 +4463,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I // Don't set inputs and outputs - the locks have already been taken so it's pointless. MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); - ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId); - Task mv = TaskFactory.get(mw), ic = TaskFactory.get(icw); - mv.addDependentTask(ic); - return Lists.>newArrayList(mv); + return Lists.>newArrayList(TaskFactory.get(mw)); } private List> alterTableAddProps(AlterTableDesc alterTbl, Table tbl, @@ -4491,7 +4480,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { - result = generateAddMmTasks(tbl); + result = generateAddMmTasks(tbl, alterTbl.getWriteId()); } else if (isFromMmTable && !isToMmTable) { throw new HiveException("Cannot convert an ACID table to non-ACID"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java deleted file mode 100644 index b3c62ad1a8..0000000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; -import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.util.StringUtils; - -public class ImportCommitTask extends Task { - - private static final long serialVersionUID = 1L; - - public ImportCommitTask() { - super(); - } - - @Override - public int execute(DriverContext driverContext) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getWriteId()); - } - - try { - if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { - return 0; - } - return 0; - } catch (Exception e) { - console.printError("Failed with exception " + e.getMessage(), "\n" - + StringUtils.stringifyException(e)); - setException(e); - return 1; - } - } - - @Override - public StageType getType() { - return StageType.MOVE; // The commit for import is normally done as part of MoveTask. - } - - @Override - public String getName() { - return "IMPORT_COMMIT"; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java deleted file mode 100644 index a119250464..0000000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import java.io.Serializable; - -import org.apache.hadoop.hive.ql.plan.Explain; -import org.apache.hadoop.hive.ql.plan.Explain.Level; - -@Explain(displayName = "Import Commit", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class ImportCommitWork implements Serializable { - private static final long serialVersionUID = 1L; - private String dbName, tblName; - private long writeId; - private int stmtId; - - public ImportCommitWork(String dbName, String tblName, long writeId, int stmtId) { - this.writeId = writeId; - this.stmtId = stmtId; - this.dbName = dbName; - this.tblName = tblName; - } - - public long getWriteId() { - return writeId; - } - - public int getStmtId() { - return stmtId; - } - - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 10a2ed2663..2da6b0f1fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -105,8 +105,6 @@ public TaskTuple(Class workClass, Class> taskClass) { MergeFileTask.class)); taskvec.add(new TaskTuple(DependencyCollectionWork.class, DependencyCollectionTask.class)); - taskvec.add(new TaskTuple(ImportCommitWork.class, - ImportCommitTask.class)); taskvec.add(new TaskTuple(TezWork.class, TezTask.class)); taskvec.add(new TaskTuple(SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4760b85da0..7d5994c976 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1475,7 +1475,7 @@ public static boolean isInsertOnlyTable(Properties params) { /** * The method for altering table props; may set the table to MM, non-MM, or not affect MM. * todo: All such validation logic should be TransactionValidationListener - * @param tbl object image before alter table command + * @param tbl object image before alter table command (or null if not retrieved yet). * @param props prop values set in this alter table command */ public static Boolean isToInsertOnlyTable(Table tbl, Map props) { @@ -1491,7 +1491,7 @@ public static Boolean isToInsertOnlyTable(Table tbl, Map props) return null; } - if(transactional == null) { + if (transactional == null && tbl != null) { transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); } boolean isSetToTxn = "true".equalsIgnoreCase(transactional); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 009a890888..c90d3cccfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1134,7 +1134,7 @@ public Table getTable(final String dbName, final String tableName, tTable = getMSC().getTable(dbName, tableName); } catch (NoSuchObjectException e) { if (throwException) { - LOG.error("Table " + tableName + " not found: " + e.getMessage()); + LOG.error("Table " + dbName + "." + tableName + " not found: " + e.getMessage()); throw new InvalidTableException(tableName); } return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 85d1cff320..caf0846052 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -2275,4 +2276,8 @@ public CacheUsage getCacheUsage() { public void setCacheUsage(CacheUsage cacheUsage) { this.cacheUsage = cacheUsage; } + + public DDLDescWithWriteId getAcidDdlDesc() { + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 49a3464776..4d08e35608 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition; import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc; import org.apache.hadoop.hive.ql.plan.AlterWMTriggerDesc; @@ -122,6 +123,7 @@ import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc; import org.apache.hadoop.hive.ql.plan.CreateResourcePlanDesc; import org.apache.hadoop.hive.ql.plan.CreateWMTriggerDesc; +import org.apache.hadoop.hive.ql.plan.DDLDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DescDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DescFunctionDesc; @@ -200,6 +202,8 @@ private final Set reservedPartitionValues; private final HiveAuthorizationTaskFactory hiveAuthorizationTaskFactory; private WriteEntity alterTableOutput; + // Equivalent to acidSinks, but for DDL operations that change data. + private DDLDesc.DDLDescWithWriteId ddlDescWithWriteId; static { TokenToTypeName.put(HiveParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME); @@ -1579,7 +1583,6 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { truncateTblDesc.setLbCtx(lbCtx); addInputsOutputsAlterTable(tableName, partSpec, AlterTableTypes.TRUNCATE); - ddlWork.setNeedLock(true); TableDesc tblDesc = Utilities.getTableDesc(table); // Write the output to temporary directory and move it to the final location at the end @@ -1752,7 +1755,18 @@ else if(entry.getKey().equals("external") && entry.getValue().equals("true")){ || mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch); - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc))); + DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), alterTblDesc); + if (isPotentialMmSwitch) { + this.ddlDescWithWriteId = alterTblDesc; + ddlWork.setNeedLock(true); // Hmm... why don't many other operations here need locks? + } + + rootTasks.add(TaskFactory.get(ddlWork)); + } + + @Override + public DDLDescWithWriteId getAcidDdlDesc() { + return ddlDescWithWriteId; } private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ac44be5e0b..b850ddc9d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.ImportCommitWork; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -480,8 +479,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm, - Task commitTask) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -540,9 +538,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); - if (commitTask != null) { - loadPartTask.addDependentTask(commitTask); - } return addPartTask; } } @@ -839,16 +834,13 @@ private static void createRegularImportTasks( if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); - Task ict = createImportCommitTask(x.getConf(), - table.getDbName(), table.getTableName(), writeId, stmtId, - AcidUtils.isInsertOnlyTable(table.getParameters())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -878,12 +870,9 @@ private static void createRegularImportTasks( x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED)); if (isPartitioned(tblDesc)) { - Task ict = createImportCommitTask(x.getConf(), - tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, - AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + replicationSpec, x, writeId, stmtId, isSourceMm)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -920,14 +909,6 @@ private static Table createNewTableMetadataObject(ImportTableDesc tblDesk) return newTable; } - private static Task createImportCommitTask( - HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) { - // TODO: noop, remove? - Task ict = (!isMmTable) ? null : TaskFactory.get( - new ImportCommitWork(dbName, tblName, writeId, stmtId), conf); - return ict; - } - /** * Create tasks for repl import */ @@ -1022,13 +1003,10 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { - Task ict = createImportCommitTask(x.getConf(), - tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, - AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1055,13 +1033,10 @@ private static void createReplImportTasks( addPartitionDesc.setReplicationSpec(replicationSpec); Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - Task ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask( - x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, - AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1078,7 +1053,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index bdecbaf144..05eca1fb6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -515,7 +515,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { String tableName, Map partSpec, String replState, - Task preCursor) { + Task preCursor) throws SemanticException { HashMap mapProp = new HashMap<>(); mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); @@ -563,7 +563,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { private List> addUpdateReplStateTasks( boolean isDatabaseLoad, UpdatedMetaDataTracker updatedMetadata, - List> importTasks) { + List> importTasks) throws SemanticException { String replState = updatedMetadata.getReplicationState(); String dbName = updatedMetadata.getDatabase(); String tableName = updatedMetadata.getTable(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index d7b224772d..a767796a94 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.io.AcidUtils; + import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -33,9 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain.Level; - import com.google.common.collect.ImmutableList; - import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -49,7 +49,7 @@ * */ @Explain(displayName = "Alter Table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class AlterTableDesc extends DDLDesc implements Serializable { +public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDLDescWithWriteId { private static final long serialVersionUID = 1L; /** @@ -124,7 +124,7 @@ boolean isStoredAsSubDirectories = false; List skewedColNames; List> skewedColValues; - Table table; + Table tableForSkewedColValidation; boolean isDropIfExists = false; boolean isTurnOffSorting = false; boolean isCascade = false; @@ -137,6 +137,7 @@ List defaultConstraintsCols; List checkConstraintsCols; ReplicationSpec replicationSpec; + private Long writeId = null; public AlterTableDesc() { } @@ -150,12 +151,13 @@ public AlterTableDesc() { * new column name * @param newComment * @param newType + * @throws SemanticException */ public AlterTableDesc(String tblName, HashMap partSpec, String oldColName, String newColName, String newType, String newComment, - boolean first, String afterCol, boolean isCascade) { + boolean first, String afterCol, boolean isCascade) throws SemanticException { super(); - oldName = tblName; + setOldName(tblName); this.partSpec = partSpec; this.oldColName = oldColName; this.newColName = newColName; @@ -172,9 +174,9 @@ public AlterTableDesc(String tblName, HashMap partSpec, boolean first, String afterCol, boolean isCascade, List primaryKeyCols, List foreignKeyCols, List uniqueConstraintCols, List notNullConstraintCols, List defaultConstraints, - List checkConstraints) { + List checkConstraints) throws SemanticException { super(); - oldName = tblName; + setOldName(tblName); this.partSpec = partSpec; this.oldColName = oldColName; this.newColName = newColName; @@ -201,10 +203,11 @@ public AlterTableDesc(String tblName, HashMap partSpec, * Flag to denote if current table can be a view * @param replicationSpec * Replication specification with current event ID + * @throws SemanticException */ - public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) { + public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) throws SemanticException { op = AlterTableTypes.RENAME; - this.oldName = oldName; + setOldName(oldName); this.newName = newName; this.expectView = expectView; this.replicationSpec = replicationSpec; @@ -215,11 +218,12 @@ public AlterTableDesc(String oldName, String newName, boolean expectView, Replic * name of the table * @param newCols * new columns to be added + * @throws SemanticException */ public AlterTableDesc(String name, HashMap partSpec, List newCols, - AlterTableTypes alterType, boolean isCascade) { + AlterTableTypes alterType, boolean isCascade) throws SemanticException { op = alterType; - oldName = name; + setOldName(name); this.newCols = new ArrayList(newCols); this.partSpec = partSpec; this.isCascade = isCascade; @@ -267,12 +271,13 @@ public AlterTableDesc(AlterTableTypes alterType, HashMap partSpe * @param outputFormat * new table output format * @param partSpec + * @throws SemanticException */ public AlterTableDesc(String name, String inputFormat, String outputFormat, - String serdeName, String storageHandler, HashMap partSpec) { + String serdeName, String storageHandler, HashMap partSpec) throws SemanticException { super(); op = AlterTableTypes.ADDFILEFORMAT; - oldName = name; + setOldName(name); this.inputFormat = inputFormat; this.outputFormat = outputFormat; this.serdeName = serdeName; @@ -281,8 +286,8 @@ public AlterTableDesc(String name, String inputFormat, String outputFormat, } public AlterTableDesc(String tableName, int numBuckets, - List bucketCols, List sortCols, HashMap partSpec) { - oldName = tableName; + List bucketCols, List sortCols, HashMap partSpec) throws SemanticException { + setOldName(tableName); op = AlterTableTypes.ADDCLUSTERSORTCOLUMN; numberBuckets = numBuckets; bucketColumns = new ArrayList(bucketCols); @@ -290,47 +295,47 @@ public AlterTableDesc(String tableName, int numBuckets, this.partSpec = partSpec; } - public AlterTableDesc(String tableName, boolean sortingOff, HashMap partSpec) { - oldName = tableName; + public AlterTableDesc(String tableName, boolean sortingOff, HashMap partSpec) throws SemanticException { + setOldName(tableName); op = AlterTableTypes.ADDCLUSTERSORTCOLUMN; isTurnOffSorting = sortingOff; this.partSpec = partSpec; } public AlterTableDesc(String tableName, String newLocation, - HashMap partSpec) { + HashMap partSpec) throws SemanticException { op = AlterTableTypes.ALTERLOCATION; - this.oldName = tableName; + setOldName(tableName); this.newLocation = newLocation; this.partSpec = partSpec; } public AlterTableDesc(String tableName, Map, String> locations, - HashMap partSpec) { + HashMap partSpec) throws SemanticException { op = AlterTableTypes.ALTERSKEWEDLOCATION; - this.oldName = tableName; + setOldName(tableName); this.skewedLocations = locations; this.partSpec = partSpec; } public AlterTableDesc(String tableName, boolean turnOffSkewed, - List skewedColNames, List> skewedColValues) { - oldName = tableName; + List skewedColNames, List> skewedColValues) throws SemanticException { + setOldName(tableName); op = AlterTableTypes.ADDSKEWEDBY; this.isTurnOffSkewed = turnOffSkewed; this.skewedColNames = new ArrayList(skewedColNames); this.skewedColValues = new ArrayList>(skewedColValues); } - public AlterTableDesc(String tableName, HashMap partSpec, int numBuckets) { + public AlterTableDesc(String tableName, HashMap partSpec, int numBuckets) throws SemanticException { op = AlterTableTypes.ALTERBUCKETNUM; - this.oldName = tableName; + setOldName(tableName); this.partSpec = partSpec; this.numberBuckets = numBuckets; } - public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) { - this.oldName = tableName; + public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) throws SemanticException { + setOldName(tableName); this.dropConstraintName = dropConstraintName; this.replicationSpec = replicationSpec; op = AlterTableTypes.DROPCONSTRAINT; @@ -338,8 +343,8 @@ public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSp public AlterTableDesc(String tableName, List primaryKeyCols, List foreignKeyCols, List uniqueConstraintCols, - ReplicationSpec replicationSpec) { - this.oldName = tableName; + ReplicationSpec replicationSpec) throws SemanticException { + setOldName(tableName); this.primaryKeyCols = primaryKeyCols; this.foreignKeyCols = foreignKeyCols; this.uniqueConstraintCols = uniqueConstraintCols; @@ -350,8 +355,8 @@ public AlterTableDesc(String tableName, List primaryKeyCols, public AlterTableDesc(String tableName, List primaryKeyCols, List foreignKeyCols, List uniqueConstraintCols, List notNullConstraintCols, List defaultConstraints, - List checkConstraints, ReplicationSpec replicationSpec) { - this.oldName = tableName; + List checkConstraints, ReplicationSpec replicationSpec) throws SemanticException { + setOldName(tableName); this.primaryKeyCols = primaryKeyCols; this.foreignKeyCols = foreignKeyCols; this.uniqueConstraintCols = uniqueConstraintCols; @@ -384,8 +389,9 @@ public String getOldName() { * @param oldName * the oldName to set */ - public void setOldName(String oldName) { - this.oldName = oldName; + public void setOldName(String oldName) throws SemanticException { + // Make sure we qualify the name from the outset so there's no ambiguity. + this.oldName = String.join(".", Utilities.getDbTableName(oldName)); } /** @@ -848,26 +854,19 @@ public void setSkewedColValues(List> skewedColValues) { * @throws SemanticException */ public void validate() throws SemanticException { - if (null != table) { + if (null != tableForSkewedColValidation) { /* Validate skewed information. */ ValidationUtility.validateSkewedInformation( - ParseUtils.validateColumnNameUniqueness(table.getCols()), this.getSkewedColNames(), - this.getSkewedColValues()); + ParseUtils.validateColumnNameUniqueness(tableForSkewedColValidation.getCols()), + this.getSkewedColNames(), this.getSkewedColValues()); } } /** - * @return the table - */ - public Table getTable() { - return table; - } - - /** * @param table the table to set */ public void setTable(Table table) { - this.table = table; + this.tableForSkewedColValidation = table; } /** @@ -929,4 +928,24 @@ public void setEnvironmentContext(EnvironmentContext environmentContext) { * This can result in a "ALTER IF NEWER THAN" kind of semantic */ public ReplicationSpec getReplicationSpec(){ return this.replicationSpec; } + + @Override + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public String getFullTableName() { + return getOldName(); + } + + @Override + public boolean mayNeedWriteId() { + return getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS + && AcidUtils.isToInsertOnlyTable(null, getProps()); + } + + public Long getWriteId() { + return this.writeId; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java index 65f4cf233b..8941d97606 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java @@ -26,4 +26,10 @@ */ public abstract class DDLDesc implements Serializable { private static final long serialVersionUID = 1L; + + public static interface DDLDescWithWriteId { + void setWriteId(long writeId); + String getFullTableName(); + boolean mayNeedWriteId(); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 0926663eab..3c2f9d42bc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2453,4 +2453,22 @@ public void testAddPartitionLocks() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + + @Test + public void testMmConversionLocks() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) tblproperties('transactional'='false')"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into T values(0,2),(1,4)"); + checkCmdOnDriver(cpr); + + cpr = driver.compileAndRespond("ALTER TABLE T set tblproperties" + + "('transactional'='true', 'transactional_properties'='insert_only')", true); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets X lock on T + + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); + } } diff --git ql/src/test/results/clientpositive/mm_conversions.q.out ql/src/test/results/clientpositive/mm_conversions.q.out deleted file mode 100644 index 4754710291..0000000000 --- ql/src/test/results/clientpositive/mm_conversions.q.out +++ /dev/null @@ -1,309 +0,0 @@ -PREHOOK: query: drop table intermediate -PREHOOK: type: DROPTABLE -POSTHOOK: query: drop table intermediate -POSTHOOK: type: DROPTABLE -PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@intermediate -POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@intermediate -PREHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1 -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: default@intermediate@p=455 -POSTHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: default@intermediate@p=455 -POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -PREHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1 -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: default@intermediate@p=456 -POSTHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: default@intermediate@p=456 -POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -PREHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1 -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: default@intermediate@p=457 -POSTHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: default@intermediate@p=457 -POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -PREHOOK: query: drop table simple_to_mm -PREHOOK: type: DROPTABLE -POSTHOOK: query: drop table simple_to_mm -POSTHOOK: type: DROPTABLE -PREHOOK: query: create table simple_to_mm(key int) stored as orc -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: create table simple_to_mm(key int) stored as orc -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@simple_to_mm -PREHOOK: query: insert into table simple_to_mm select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: insert into table simple_to_mm select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@simple_to_mm -POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: select * from simple_to_mm s1 order by key -PREHOOK: type: QUERY -PREHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -POSTHOOK: query: select * from simple_to_mm s1 order by key -POSTHOOK: type: QUERY -POSTHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -0 -98 -100 -PREHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") -PREHOOK: type: ALTERTABLE_PROPERTIES -PREHOOK: Input: default@simple_to_mm -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") -POSTHOOK: type: ALTERTABLE_PROPERTIES -POSTHOOK: Input: default@simple_to_mm -POSTHOOK: Output: default@simple_to_mm -PREHOOK: query: select * from simple_to_mm s2 order by key -PREHOOK: type: QUERY -PREHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -POSTHOOK: query: select * from simple_to_mm s2 order by key -POSTHOOK: type: QUERY -POSTHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -0 -98 -100 -PREHOOK: query: insert into table simple_to_mm select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: insert into table simple_to_mm select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@simple_to_mm -POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: insert into table simple_to_mm select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: insert into table simple_to_mm select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@simple_to_mm -POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: select * from simple_to_mm s3 order by key -PREHOOK: type: QUERY -PREHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -POSTHOOK: query: select * from simple_to_mm s3 order by key -POSTHOOK: type: QUERY -POSTHOOK: Input: default@simple_to_mm -#### A masked pattern was here #### -0 -0 -0 -98 -98 -98 -100 -100 -100 -PREHOOK: query: drop table simple_to_mm -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@simple_to_mm -PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: drop table simple_to_mm -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@simple_to_mm -POSTHOOK: Output: default@simple_to_mm -PREHOOK: query: drop table part_to_mm -PREHOOK: type: DROPTABLE -POSTHOOK: query: drop table part_to_mm -POSTHOOK: type: DROPTABLE -PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@part_to_mm -POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@part_to_mm -PREHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@part_to_mm@key_mm=455 -POSTHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@part_to_mm@key_mm=455 -POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@part_to_mm@key_mm=456 -POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@part_to_mm@key_mm=456 -POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: select * from part_to_mm s1 order by key, key_mm -PREHOOK: type: QUERY -PREHOOK: Input: default@part_to_mm -PREHOOK: Input: default@part_to_mm@key_mm=455 -PREHOOK: Input: default@part_to_mm@key_mm=456 -#### A masked pattern was here #### -POSTHOOK: query: select * from part_to_mm s1 order by key, key_mm -POSTHOOK: type: QUERY -POSTHOOK: Input: default@part_to_mm -POSTHOOK: Input: default@part_to_mm@key_mm=455 -POSTHOOK: Input: default@part_to_mm@key_mm=456 -#### A masked pattern was here #### -0 455 -0 456 -98 455 -98 456 -100 455 -100 456 -PREHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") -PREHOOK: type: ALTERTABLE_PROPERTIES -PREHOOK: Input: default@part_to_mm -PREHOOK: Output: default@part_to_mm -POSTHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") -POSTHOOK: type: ALTERTABLE_PROPERTIES -POSTHOOK: Input: default@part_to_mm -POSTHOOK: Output: default@part_to_mm -PREHOOK: query: select * from part_to_mm s2 order by key, key_mm -PREHOOK: type: QUERY -PREHOOK: Input: default@part_to_mm -PREHOOK: Input: default@part_to_mm@key_mm=455 -PREHOOK: Input: default@part_to_mm@key_mm=456 -#### A masked pattern was here #### -POSTHOOK: query: select * from part_to_mm s2 order by key, key_mm -POSTHOOK: type: QUERY -POSTHOOK: Input: default@part_to_mm -POSTHOOK: Input: default@part_to_mm@key_mm=455 -POSTHOOK: Input: default@part_to_mm@key_mm=456 -#### A masked pattern was here #### -0 455 -0 456 -98 455 -98 456 -100 455 -100 456 -PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@part_to_mm@key_mm=456 -POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@part_to_mm@key_mm=456 -POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate -PREHOOK: type: QUERY -PREHOOK: Input: default@intermediate -PREHOOK: Input: default@intermediate@p=455 -PREHOOK: Input: default@intermediate@p=456 -PREHOOK: Input: default@intermediate@p=457 -PREHOOK: Output: default@part_to_mm@key_mm=457 -POSTHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate -POSTHOOK: type: QUERY -POSTHOOK: Input: default@intermediate -POSTHOOK: Input: default@intermediate@p=455 -POSTHOOK: Input: default@intermediate@p=456 -POSTHOOK: Input: default@intermediate@p=457 -POSTHOOK: Output: default@part_to_mm@key_mm=457 -POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=457).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] -PREHOOK: query: select * from part_to_mm s3 order by key, key_mm -PREHOOK: type: QUERY -PREHOOK: Input: default@part_to_mm -PREHOOK: Input: default@part_to_mm@key_mm=455 -PREHOOK: Input: default@part_to_mm@key_mm=456 -PREHOOK: Input: default@part_to_mm@key_mm=457 -#### A masked pattern was here #### -POSTHOOK: query: select * from part_to_mm s3 order by key, key_mm -POSTHOOK: type: QUERY -POSTHOOK: Input: default@part_to_mm -POSTHOOK: Input: default@part_to_mm@key_mm=455 -POSTHOOK: Input: default@part_to_mm@key_mm=456 -POSTHOOK: Input: default@part_to_mm@key_mm=457 -#### A masked pattern was here #### -0 455 -0 456 -0 456 -0 457 -98 455 -98 456 -98 456 -98 457 -100 455 -100 456 -100 456 -100 457 -PREHOOK: query: drop table part_to_mm -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@part_to_mm -PREHOOK: Output: default@part_to_mm -POSTHOOK: query: drop table part_to_mm -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@part_to_mm -POSTHOOK: Output: default@part_to_mm -PREHOOK: query: drop table intermediate -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@intermediate -PREHOOK: Output: default@intermediate -POSTHOOK: query: drop table intermediate -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@intermediate -POSTHOOK: Output: default@intermediate