diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index c27943b..39f98e4 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -104,7 +104,7 @@ protected void setUp() { db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false, false, false, false, null, 0); + db.loadTable(hadoopDataFile[i], src, false, false, false, false, false, null, 0, false); i++; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index 7ef4f49..70e764e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -89,7 +89,7 @@ public void initializeOp(Configuration hconf) throws HiveException { .isListBucketingAlterTableConcatenate(); listBucketingDepth = conf.getListBucketingDepth(); Path specPath = conf.getOutputPath(); - isMmTable = conf.getTxnId() != null; + isMmTable = conf.getIsMmTable(); if (isMmTable) { updatePaths(specPath, null); } else { @@ -282,7 +282,7 @@ public void jobCloseOp(Configuration hconf, boolean success) FileSystem fs = outputDir.getFileSystem(hconf); Long mmWriteId = conf.getTxnId(); int stmtId = conf.getStmtId(); - if (mmWriteId == null) { + if (!isMmTable) { Path backupPath = backupOutputPath(fs, outputDir); Utilities.mvFileToFinalPath( outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter); @@ -298,7 +298,7 @@ public void jobCloseOp(Configuration hconf, boolean success) // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, false); + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index b0a731e..77e664f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; @@ -277,7 +278,8 @@ private boolean getNextPath() throws Exception { } FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { - if (extractValidTxnList() != null) { + if (extractValidTxnList() != null && + MetaStoreUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { return true; } for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { @@ -411,7 +413,12 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx if (inputFormat instanceof HiveInputFormat) { return StringUtils.escapeString(currPath.toString()); // No need to process here. } - ValidTxnList validTxnList = extractValidTxnList(); + ValidTxnList validTxnList; + if (MetaStoreUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { + validTxnList = extractValidTxnList(); + } else { + validTxnList = null; // non-MM case + } if (validTxnList != null) { Utilities.LOG14535.info("Observing " + currDesc.getTableName() + ": " + validTxnList); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index b430f23..351059b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -1198,9 +1198,9 @@ public void closeOp(boolean abort) throws HiveException { fsp.commit(fs, commitPaths); } } - if (conf.getMmWriteId() != null) { + if (conf.isMmTable()) { Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getMmWriteId(), conf.getStatementId(), unionPath); + commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1256,7 +1256,8 @@ public void jobCloseOp(Configuration hconf, boolean success) MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, - dpLevels, lbLevels, mbc, conf.getMmWriteId(), conf.getStatementId(), reporter, conf.isMmCtas()); + dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter, + conf.isMmTable(), conf.isMmCtas()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index c68fc0e..e86ca3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -339,11 +339,8 @@ public int execute(DriverContext driverContext) { checkFileFormats(db, tbd, table); - boolean isFullAcidOp = work.getLoadTableWork().getWriteType() == AcidUtils.Operation.UPDATE || - work.getLoadTableWork().getWriteType() == AcidUtils.Operation.DELETE; - if (tbd.isMmTable() && isFullAcidOp) { - throw new HiveException("UPDATE and DELETE operations are not supported for MM table"); - } + boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID + && !tbd.isMmTable(); // Create a data container DataContainer dc = null; @@ -356,7 +353,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getTxnId(), tbd.getStmtId()); + tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable()); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -416,8 +413,8 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - work.getLoadTableWork().getWriteType() == AcidUtils.Operation.UPDATE || - work.getLoadTableWork().getWriteType() == AcidUtils.Operation.DELETE, + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && + !tbd.isMmTable(), hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); @@ -462,8 +459,8 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, tbd.getReplace(), dpCtx.getNumDPCols(), (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(), - work.getLoadTableWork().getWriteType() == AcidUtils.Operation.UPDATE || - work.getLoadTableWork().getWriteType() == AcidUtils.Operation.DELETE, + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && + !tbd.isMmTable(), SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b755e2d..fa58329 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1538,7 +1538,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; - return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0); + return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0, false); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1554,7 +1554,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId) throws IOException { + int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId, boolean isMmTable) throws IOException { if (fileStats == null) { return null; } @@ -1573,7 +1573,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } FileStatus[] items = fs.listStatus(path); - if (txnId != null) { + if (isMmTable) { Path mmDir = parts[i].getPath(); if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); @@ -1590,7 +1590,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (items.length == 0) { return result; } - if (txnId == null) { + if (!isMmTable) { taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); } else { if (items.length > 1) { @@ -4120,7 +4120,7 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, - Reporter reporter, boolean isMmCtas) throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix); if (!success) { @@ -4213,7 +4213,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); } List emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId); + fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId, isMmTable); // create empty buckets if necessary if (emptyBuckets.size() > 0) { assert mbc != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 98ea141..71d01ed 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -428,9 +429,13 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job InputFormat inputFormat, Class inputFormatClass, int splits, TableDesc table, List result) throws IOException { - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() : - new ValidReadTxnList(txnString); + ValidTxnList validTxnList; + if (MetaStoreUtils.isInsertOnlyTable(table.getProperties())) { + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + validTxnList = txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + } else { + validTxnList = null; // for non-MM case + } try { Utilities.copyTablePropertiesToConf(table, conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d793ccf..a16bf91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1623,8 +1623,6 @@ public void loadPartition(Path loadPath, String tableName, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Table tbl = getTable(tableName); - boolean isMmTableWrite = (txnId != null); - Preconditions.checkState(isMmTableWrite == MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())); loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, txnId, stmtId); } @@ -1659,6 +1657,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); + boolean isMmTableWrite = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()); try { // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); @@ -1705,7 +1704,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par newFiles = Collections.synchronizedList(new ArrayList()); } // TODO: this assumes both paths are qualified; which they are, currently. - if (txnId != null && loadPath.equals(newPartPath)) { + if (isMmTableWrite && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)"); assert !isAcid; @@ -1723,7 +1722,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // Either a non-MM query, or a load into MM table from an external source. PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; Path destPath = newPartPath; - if (txnId != null) { + if (isMmTableWrite) { // We will load into MM directory, and delete from the parent if needed. destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); filter = replace ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; @@ -1732,7 +1731,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (replace || (oldPart == null && !isAcid)) { boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), - isSrcLocal, isAutoPurge, newFiles, filter, txnId != null); + isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles); @@ -1982,11 +1981,11 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws HiveException */ private Set getValidPartitionsInPath( - int numDP, int numLB, Path loadPath, Long txnId, int stmtId) throws HiveException { + int numDP, int numLB, Path loadPath, Long txnId, int stmtId, boolean isMmTable) throws HiveException { Set validPartitions = new HashSet(); try { FileSystem fs = loadPath.getFileSystem(conf); - if (txnId == null) { + if (!isMmTable) { FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { @@ -2063,7 +2062,8 @@ private void constructOneLBLocationMap(FileStatus fSta, // Get all valid partition paths and existing partitions for them (if any) final Table tbl = getTable(tableName); - final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, txnId, stmtId); + final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, txnId, stmtId, + MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())); final int partsToLoad = validPartitions.size(); final AtomicInteger partitionsLoaded = new AtomicInteger(0); @@ -2179,7 +2179,7 @@ public Void call() throws Exception { */ public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long txnId, int stmtId) throws HiveException { + Long txnId, int stmtId, boolean isMmTable) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); HiveConf sessionConf = SessionState.getSessionConf(); @@ -2187,7 +2187,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean newFiles = Collections.synchronizedList(new ArrayList()); } // TODO: this assumes both paths are qualified; which they are, currently. - if (txnId != null && loadPath.equals(tbl.getPath())) { + if (isMmTable && loadPath.equals(tbl.getPath())) { Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath()); if (replace) { Path tableDest = tbl.getPath(); @@ -2201,7 +2201,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean // Either a non-MM query, or a load into MM table from an external source. Path tblPath = tbl.getPath(), destPath = tblPath; PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; - if (txnId != null) { + if (isMmTable) { // We will load into MM directory, and delete from the parent if needed. destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); filter = replace ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; @@ -2210,7 +2210,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean if (replace) { boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tblPath, loadPath, destPath, tblPath, - sessionConf, isSrcLocal, isAutopurge, newFiles, filter, txnId != null); + sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 204e67d..d739709 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1265,7 +1265,8 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // FileSinkDesc fsInputDesc = fsInput.getConf(); Utilities.LOG14535.info("Creating merge work from " + System.identityHashCode(fsInput) - + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null) + " into " + finalName); + + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getTransactionId() : null) + + " into " + finalName); boolean isBlockMerge = (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) && fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) || @@ -1273,7 +1274,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)); RowSchema inputRS = fsInput.getSchema(); - Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null; + Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getTransactionId() : null; FileSinkDesc fsOutputDesc = null; TableScanOperator tsMerge = null; if (!isBlockMerge) { @@ -1636,7 +1637,8 @@ public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path finalName, } else { fmd = new OrcFileMergeDesc(); } - fmd.setTxnId(fsInputDesc.getMmWriteId()); + fmd.setIsMmTable(fsInputDesc.isMmTable()); + fmd.setTxnId(fsInputDesc.getTransactionId()); int stmtId = fsInputDesc.getStatementId(); fmd.setStmtId(stmtId == -1 ? 0 : stmtId); fmd.setDpCtx(fsInputDesc.getDynPartCtx()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java index b50f664..2f9783e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java @@ -86,7 +86,7 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) ParseContext pc = physicalContext.getParseContext(); if (pc.getLoadTableWork() != null) { for (LoadTableDesc ltd : pc.getLoadTableWork()) { - if (ltd.getTxnId() == null) continue; + if (!ltd.isMmTable()) continue; // See the path in FSOP that calls fs.exists on finalPath. LOG.debug("Not using skew join because the destination table " + ltd.getTable().getTableName() + " is an insert_only table"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 5ef77f5..9b39cd0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.InputFormat; import com.google.common.collect.Lists; @@ -273,7 +274,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { int stmtId = 0; Table tbl = ts.tableHandle; if (MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) { - txnId = 0l; //todo to be replaced with txnId in Driver + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); } LoadTableDesc loadTableWork; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 01dead5..ed1417b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7076,7 +7076,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); } - assert isMmTable == (txnId != null); FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java index 8f6166a..80f7c16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java @@ -30,6 +30,7 @@ private boolean isListBucketingAlterTableConcatenate; private Long txnId; private int stmtId; + private boolean isMmTable; public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) { this.dpCtx = dynPartCtx; @@ -91,4 +92,12 @@ public int getStmtId() { public void setStmtId(int stmtId) { this.stmtId = stmtId; } + + public boolean getIsMmTable() { + return isMmTable; + } + + public void setIsMmTable(boolean isMmTable) { + this.isMmTable = isMmTable; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 7f4cabe..a26047d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -275,11 +276,11 @@ public void setTemporary(boolean temporary) { } public boolean isMmTable() { - return mmWriteId != null; - } - - public Long getMmWriteId() { - return mmWriteId; + if (getTable() != null) { + return MetaStoreUtils.isInsertOnlyTable(table.getParameters()); + } else { // Dynamic Partition Insert case + return MetaStoreUtils.isInsertOnlyTable(getTableInfo().getProperties()); + } } public boolean isMaterialization() { @@ -475,7 +476,6 @@ public String getWriteTypeString() { } public void setTransactionId(long id) { txnId = id; - setMmWriteId(id); } public long getTransactionId() { return txnId; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 5bb52b4..3201dc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -159,11 +160,11 @@ public boolean getReplace() { @Explain(displayName = "micromanaged table") public Boolean isMmTableExplain() { - return txnId != null? true : null; + return isMmTable() ? true : null; } public boolean isMmTable() { - return txnId != null; + return MetaStoreUtils.isInsertOnlyTable(table.getProperties()); } public void setReplace(boolean replace) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index d7adf95..61eab3d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -140,7 +140,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null, 0); + db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null, 0, false); i++; } diff --git ql/src/test/results/clientpositive/llap/mm_all.q.out ql/src/test/results/clientpositive/llap/mm_all.q.out index 62ad7b6..f184ba6 100644 --- ql/src/test/results/clientpositive/llap/mm_all.q.out +++ ql/src/test/results/clientpositive/llap/mm_all.q.out @@ -68,14 +68,14 @@ STAGE PLANS: Map Operator Tree: TableScan alias: intermediate - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 72 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: int) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 72 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 72 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat @@ -161,6 +161,24 @@ POSTHOOK: Input: default@part_mm POSTHOOK: Input: default@part_mm@key_mm=455 POSTHOOK: Input: default@part_mm@key_mm=456 #### A masked pattern was here #### +0 455 +0 455 +0 456 +10 455 +10 455 +10 456 +97 455 +97 455 +97 456 +98 455 +98 455 +98 456 +100 455 +100 455 +100 456 +103 455 +103 455 +103 456 PREHOOK: query: select * from part_mm order by key, key_mm PREHOOK: type: QUERY PREHOOK: Input: default@part_mm @@ -173,6 +191,24 @@ POSTHOOK: Input: default@part_mm POSTHOOK: Input: default@part_mm@key_mm=455 POSTHOOK: Input: default@part_mm@key_mm=456 #### A masked pattern was here #### +0 455 +0 455 +0 456 +10 455 +10 455 +10 456 +97 455 +97 455 +97 456 +98 455 +98 455 +98 456 +100 455 +100 455 +100 456 +103 455 +103 455 +103 456 PREHOOK: query: truncate table part_mm PREHOOK: type: TRUNCATETABLE PREHOOK: Output: default@part_mm@key_mm=455 @@ -236,6 +272,12 @@ POSTHOOK: query: select * from simple_mm order by key POSTHOOK: type: QUERY POSTHOOK: Input: default@simple_mm #### A masked pattern was here #### +0 +10 +97 +98 +100 +103 PREHOOK: query: insert into table simple_mm select key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -259,6 +301,18 @@ POSTHOOK: query: select * from simple_mm order by key POSTHOOK: type: QUERY POSTHOOK: Input: default@simple_mm #### A masked pattern was here #### +0 +0 +10 +10 +97 +97 +98 +98 +100 +100 +103 +103 PREHOOK: query: truncate table simple_mm PREHOOK: type: TRUNCATETABLE PREHOOK: Output: default@simple_mm @@ -308,14 +362,44 @@ POSTHOOK: Input: default@intermediate POSTHOOK: Input: default@intermediate@p=455 POSTHOOK: Input: default@intermediate@p=456 POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@dp_mm@key1=123/key2=0 +POSTHOOK: Output: default@dp_mm@key1=123/key2=10 +POSTHOOK: Output: default@dp_mm@key1=123/key2=100 +POSTHOOK: Output: default@dp_mm@key1=123/key2=103 +POSTHOOK: Output: default@dp_mm@key1=123/key2=97 +POSTHOOK: Output: default@dp_mm@key1=123/key2=98 +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=0).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=100).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=103).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=10).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=97).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=98).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] PREHOOK: query: select * from dp_mm order by key PREHOOK: type: QUERY PREHOOK: Input: default@dp_mm +PREHOOK: Input: default@dp_mm@key1=123/key2=0 +PREHOOK: Input: default@dp_mm@key1=123/key2=10 +PREHOOK: Input: default@dp_mm@key1=123/key2=100 +PREHOOK: Input: default@dp_mm@key1=123/key2=103 +PREHOOK: Input: default@dp_mm@key1=123/key2=97 +PREHOOK: Input: default@dp_mm@key1=123/key2=98 #### A masked pattern was here #### POSTHOOK: query: select * from dp_mm order by key POSTHOOK: type: QUERY POSTHOOK: Input: default@dp_mm -#### A masked pattern was here #### +POSTHOOK: Input: default@dp_mm@key1=123/key2=0 +POSTHOOK: Input: default@dp_mm@key1=123/key2=10 +POSTHOOK: Input: default@dp_mm@key1=123/key2=100 +POSTHOOK: Input: default@dp_mm@key1=123/key2=103 +POSTHOOK: Input: default@dp_mm@key1=123/key2=97 +POSTHOOK: Input: default@dp_mm@key1=123/key2=98 +#### A masked pattern was here #### +0 123 0 +10 123 10 +97 123 97 +98 123 98 +100 123 100 +103 123 103 PREHOOK: query: drop table dp_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@dp_mm @@ -363,6 +447,18 @@ POSTHOOK: query: select * from union_mm order by id POSTHOOK: type: QUERY POSTHOOK: Input: default@union_mm #### A masked pattern was here #### +0 +1 +10 +11 +97 +98 +98 +99 +100 +101 +103 +104 PREHOOK: query: insert into table union_mm select p from ( @@ -402,6 +498,23 @@ POSTHOOK: query: select * from union_mm order by id POSTHOOK: type: QUERY POSTHOOK: Input: default@union_mm #### A masked pattern was here #### +0 +1 +2 +10 +11 +12 +97 +98 +99 +99 +100 +100 +101 +102 +103 +104 +105 PREHOOK: query: insert into table union_mm SELECT p FROM ( @@ -455,6 +568,38 @@ POSTHOOK: query: select * from union_mm order by id POSTHOOK: type: QUERY POSTHOOK: Input: default@union_mm #### A masked pattern was here #### +0 +0 +1 +1 +2 +2 +10 +10 +11 +11 +12 +12 +97 +97 +98 +98 +99 +99 +99 +100 +100 +100 +101 +101 +102 +102 +103 +103 +104 +104 +105 +105 PREHOOK: query: drop table union_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@union_mm @@ -545,6 +690,12 @@ POSTHOOK: query: select * from skew_mm order by k2, k1, k4 POSTHOOK: type: QUERY POSTHOOK: Input: default@skew_mm #### A masked pattern was here #### +0 0 0 +10 10 10 +97 97 97 +98 98 98 +100 100 100 +103 103 103 PREHOOK: query: drop table skew_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@skew_mm @@ -629,6 +780,12 @@ POSTHOOK: query: select * from merge0_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@merge0_mm #### A masked pattern was here #### +98 +97 +100 +103 +0 +10 PREHOOK: query: insert into table merge0_mm select key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -652,6 +809,18 @@ POSTHOOK: query: select * from merge0_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@merge0_mm #### A masked pattern was here #### +98 +97 +100 +103 +0 +10 +98 +97 +100 +103 +0 +10 PREHOOK: query: drop table merge0_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@merge0_mm @@ -691,6 +860,12 @@ POSTHOOK: query: select * from merge2_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@merge2_mm #### A masked pattern was here #### +98 +97 +100 +103 +0 +10 PREHOOK: query: insert into table merge2_mm select key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -714,6 +889,18 @@ POSTHOOK: query: select * from merge2_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@merge2_mm #### A masked pattern was here #### +98 +97 +100 +103 +0 +10 +98 +97 +100 +103 +0 +10 PREHOOK: query: drop table merge2_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@merge2_mm @@ -743,14 +930,44 @@ POSTHOOK: Input: default@intermediate POSTHOOK: Input: default@intermediate@p=455 POSTHOOK: Input: default@intermediate@p=456 POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@merge1_mm@key=0 +POSTHOOK: Output: default@merge1_mm@key=10 +POSTHOOK: Output: default@merge1_mm@key=100 +POSTHOOK: Output: default@merge1_mm@key=103 +POSTHOOK: Output: default@merge1_mm@key=97 +POSTHOOK: Output: default@merge1_mm@key=98 +POSTHOOK: Lineage: merge1_mm PARTITION(key=0).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=100).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=103).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=10).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=97).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=98).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] PREHOOK: query: select * from merge1_mm order by id, key PREHOOK: type: QUERY PREHOOK: Input: default@merge1_mm +PREHOOK: Input: default@merge1_mm@key=0 +PREHOOK: Input: default@merge1_mm@key=10 +PREHOOK: Input: default@merge1_mm@key=100 +PREHOOK: Input: default@merge1_mm@key=103 +PREHOOK: Input: default@merge1_mm@key=97 +PREHOOK: Input: default@merge1_mm@key=98 #### A masked pattern was here #### POSTHOOK: query: select * from merge1_mm order by id, key POSTHOOK: type: QUERY POSTHOOK: Input: default@merge1_mm -#### A masked pattern was here #### +POSTHOOK: Input: default@merge1_mm@key=0 +POSTHOOK: Input: default@merge1_mm@key=10 +POSTHOOK: Input: default@merge1_mm@key=100 +POSTHOOK: Input: default@merge1_mm@key=103 +POSTHOOK: Input: default@merge1_mm@key=97 +POSTHOOK: Input: default@merge1_mm@key=98 +#### A masked pattern was here #### +0 0 +10 10 +97 97 +98 98 +100 100 +103 103 PREHOOK: query: insert into table merge1_mm partition (key) select key, key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -764,14 +981,50 @@ POSTHOOK: Input: default@intermediate POSTHOOK: Input: default@intermediate@p=455 POSTHOOK: Input: default@intermediate@p=456 POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@merge1_mm@key=0 +POSTHOOK: Output: default@merge1_mm@key=10 +POSTHOOK: Output: default@merge1_mm@key=100 +POSTHOOK: Output: default@merge1_mm@key=103 +POSTHOOK: Output: default@merge1_mm@key=97 +POSTHOOK: Output: default@merge1_mm@key=98 +POSTHOOK: Lineage: merge1_mm PARTITION(key=0).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=100).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=103).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=10).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=97).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: merge1_mm PARTITION(key=98).id SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] PREHOOK: query: select * from merge1_mm order by id, key PREHOOK: type: QUERY PREHOOK: Input: default@merge1_mm +PREHOOK: Input: default@merge1_mm@key=0 +PREHOOK: Input: default@merge1_mm@key=10 +PREHOOK: Input: default@merge1_mm@key=100 +PREHOOK: Input: default@merge1_mm@key=103 +PREHOOK: Input: default@merge1_mm@key=97 +PREHOOK: Input: default@merge1_mm@key=98 #### A masked pattern was here #### POSTHOOK: query: select * from merge1_mm order by id, key POSTHOOK: type: QUERY POSTHOOK: Input: default@merge1_mm -#### A masked pattern was here #### +POSTHOOK: Input: default@merge1_mm@key=0 +POSTHOOK: Input: default@merge1_mm@key=10 +POSTHOOK: Input: default@merge1_mm@key=100 +POSTHOOK: Input: default@merge1_mm@key=103 +POSTHOOK: Input: default@merge1_mm@key=97 +POSTHOOK: Input: default@merge1_mm@key=98 +#### A masked pattern was here #### +0 0 +0 0 +10 10 +10 10 +97 97 +97 97 +98 98 +98 98 +100 100 +100 100 +103 103 +103 103 PREHOOK: query: drop table merge1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@merge1_mm @@ -842,7 +1095,7 @@ POSTHOOK: query: select count(1) from load0_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@load0_mm #### A masked pattern was here #### -1000 +500 PREHOOK: query: drop table load0_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@load0_mm @@ -993,7 +1246,7 @@ POSTHOOK: query: select count(1) from load1_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@load1_mm #### A masked pattern was here #### -1050 +500 PREHOOK: query: drop table load1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@load1_mm @@ -1155,6 +1408,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@parquet1_mm POSTHOOK: Input: default@parquet2_mm #### A masked pattern was here #### +1 value1 value2 PREHOOK: query: drop table parquet1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@parquet1_mm diff --git ql/src/test/results/clientpositive/mm_all.q.out ql/src/test/results/clientpositive/mm_all.q.out index 71826df..dacd984 100644 --- ql/src/test/results/clientpositive/mm_all.q.out +++ ql/src/test/results/clientpositive/mm_all.q.out @@ -1284,7 +1284,7 @@ POSTHOOK: query: select count(1) from load0_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@load0_mm #### A masked pattern was here #### -1000 +500 PREHOOK: query: drop table load0_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@load0_mm @@ -1435,7 +1435,7 @@ POSTHOOK: query: select count(1) from load1_mm POSTHOOK: type: QUERY POSTHOOK: Input: default@load1_mm #### A masked pattern was here #### -1050 +500 PREHOOK: query: drop table load1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@load1_mm