diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 2036f2932d..d9be6ffbfe 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -45,6 +45,9 @@ minitez.query.files=acid_vectorization_original_tez.q,\ minillap.shared.query.files=acid_direct_insert_insert_overwrite.q,\ + acid_direct_update_delete.q,\ + acid_direct_update_delete_with_merge.q,\ + acid_direct_update_delete_partitions.q,\ acid_multiinsert_dyn_part.q,\ insert_into1.q,\ insert_into2.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index d8f8e72efa..44f7d65513 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -258,7 +258,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeCommitManifest(Lists.newArrayList(outPath), tmpPath.getParent(), fs, taskId, conf.getWriteId(), - conf.getStmtId(), null, false, hasDynamicPartitions, new HashSet<>()); + conf.getStmtId(), null, false, hasDynamicPartitions, new HashSet<>(), false); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -337,8 +337,8 @@ public void jobCloseOp(Configuration hconf, boolean success) lbLevels = conf.getListBucketingDepth(); // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. - Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false); + Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, dpLevels, lbLevels, + null, mmWriteId, stmtId, reporter, isMmTable, false, false, false, false); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 082f1cbc09..56d414e07c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -173,12 +173,15 @@ int acidFileOffset = -1; private boolean isMmTable; private boolean isDirectInsert; + private AcidUtils.Operation acidOperation; private boolean isInsertOverwrite; String dpDirForCounters; - public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert, boolean isInsertOverwrite) { + public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert, boolean isInsertOverwrite, + AcidUtils.Operation acidOperation) { this.isMmTable = isMmTable; this.isDirectInsert = isDirectInsert; + this.acidOperation = acidOperation; this.isInsertOverwrite = isInsertOverwrite; if (!isMmTable && !isDirectInsert) { tmpPathRoot = Utilities.toTempPath(specPath); @@ -229,8 +232,10 @@ public void closeWriters(boolean abort) throws HiveException { for (int i = 0; i < updaters.length; i++) { if (updaters[i] != null) { SerDeStats stats = updaters[i].getStats(); - // Ignore 0 row files except in case of insert overwrite - if (isDirectInsert && (stats.getRowCount() > 0 || isInsertOverwrite)) { + // Ignore 0 row files except in case of insert overwrite or delete or update + if (isDirectInsert + && (stats.getRowCount() > 0 || isInsertOverwrite || AcidUtils.Operation.DELETE.equals(acidOperation) + || AcidUtils.Operation.UPDATE.equals(acidOperation))) { outPathsCommitted[i] = updaters[i].getUpdatedFilePath(); } updaters[i].close(abort); @@ -271,7 +276,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) if (isMmTable) { assert outPaths[idx].equals(finalPaths[idx]); commitPaths.add(outPaths[idx]); - } else if (isDirectInsert && (outPathsCommitted[idx] != null)) { + } else if (isDirectInsert && outPathsCommitted[idx] != null) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER .trace("committing " + outPathsCommitted[idx] + " (direct insert = " + isDirectInsert + ")"); @@ -365,7 +370,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT // affects some less obscure scenario. try { FileSystem fpfs = finalPath.getFileSystem(hconf); - if ((!isDirectInsert) && fpfs.exists(finalPath)) { + if (!isDirectInsert && fpfs.exists(finalPath)) { throw new RuntimeException(finalPath + " already exists"); } } catch (IOException e) { @@ -619,7 +624,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { } if (!bDynParts) { - fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite()); + fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite(), + conf.getAcidOperation()); fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp) @@ -856,10 +862,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) // the row. ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector; int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); - String attemptId = null; - if (conf.isDirectInsert()) { - attemptId = taskId.split("_")[1]; - } + String attemptId = getAttemptIdFromTaskId(taskId); fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1, attemptId); // outPath.getParent() } @@ -1092,9 +1095,9 @@ public void process(Object row, int tag) throws HiveException { writerOffset = fpaths.createDynamicBucket(bucketNum); } if (fpaths.updaters[writerOffset] == null) { - fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, - fpaths.outPaths[writerOffset], rowInspector, reporter, 0); + String attemptId = getAttemptIdFromTaskId(taskId); + fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), bucketNum, + conf, fpaths.outPaths[writerOffset], rowInspector, reporter, 0, attemptId); if (LOG.isDebugEnabled()) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + fpaths.outPaths[writerOffset]); @@ -1157,7 +1160,8 @@ assert getConf().getWriteType() != AcidUtils.Operation.DELETE && * @throws HiveException */ private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite()); + FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite(), + conf.getAcidOperation()); fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath); fsp2.subdirBeforeTxn = dpDir; String pathKey = combinePathFragments(dpDir, lbDir); @@ -1410,8 +1414,9 @@ public void closeOp(boolean abort) throws HiveException { } } if (conf.isMmTable() || conf.isDirectInsert()) { - Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf - .getStatementId(), unionPath, conf.getInsertOverwrite(), bDynParts, dynamicPartitionSpecs); + boolean isDelete = AcidUtils.Operation.DELETE.equals(conf.getAcidOperation()); + Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), + conf.getStatementId(), unionPath, conf.getInsertOverwrite(), bDynParts, dynamicPartitionSpecs, isDelete); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1473,9 +1478,10 @@ public void jobCloseOp(Configuration hconf, boolean success) : (dpCtx != null ? dpCtx.getNumBuckets() : 0); MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); + boolean isDelete = AcidUtils.Operation.DELETE.equals(conf.getAcidOperation()); Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, - conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf - .getInsertOverwrite(), conf.isDirectInsert()); + conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), + conf.getInsertOverwrite(), conf.isDirectInsert(), isDelete); } } } catch (IOException e) { @@ -1689,4 +1695,11 @@ public void configureJobConf(JobConf job) { job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true); } } + + private String getAttemptIdFromTaskId(String taskId) { + if (!conf.isDirectInsert() || taskId == null || taskId.split("_").length < 2) { + return null; + } + return taskId.split("_")[1]; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5d244ecf47..1d661413a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4239,10 +4239,10 @@ private static void tryDelete(FileSystem fs, Path path) { public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels, PathFilter filter, long writeId, int stmtId, Configuration conf, - Boolean isBaseDir) throws IOException { + Boolean isBaseDir, boolean isDelete) throws IOException { int skipLevels = dpLevels; if (filter == null) { - filter = new AcidUtils.IdPathFilter(writeId, stmtId); + filter = new AcidUtils.IdPathFilter(writeId, stmtId, isDelete); } if (skipLevels == 0) { return statusToPath(fs.listStatus(path, filter)); @@ -4255,7 +4255,8 @@ private static void tryDelete(FileSystem fs, Path path) { || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); + return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir, + isDelete); } private static boolean isS3(FileSystem fs) { @@ -4332,7 +4333,7 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, - PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException { + PathFilter filter, long writeId, int stmtId, boolean isBaseDir, boolean isDelete) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append('*'); @@ -4342,7 +4343,11 @@ private static boolean isS3(FileSystem fs) { // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(writeId, writeId)).append("_*"); throw new AssertionError("GlobStatus should not be called without a statement ID"); } else { - sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId)); + String deltaSubDir = AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId); + if (isDelete) { + deltaSubDir = AcidUtils.deleteDeltaSubdir(writeId, writeId, stmtId); + } + sb.append(Path.SEPARATOR).append(deltaSubDir); } Path pathPattern = new Path(path, sb.toString()); return statusToPath(fs.globStatus(pathPattern, filter)); @@ -4350,9 +4355,9 @@ private static boolean isS3(FileSystem fs) { private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, - Configuration conf) throws IOException { + Configuration conf, boolean isDelete) throws IOException { Path[] files = getDirectInsertDirectoryCandidates( - fs, specPath, dpLevels, filter, writeId, stmtId, conf, null); + fs, specPath, dpLevels, filter, writeId, stmtId, conf, null, isDelete); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4366,7 +4371,7 @@ private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, public static void writeCommitManifest(List commitPaths, Path specPath, FileSystem fs, String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite, - boolean hasDynamicPartitions, Set dynamicPartitionSpecs) throws HiveException { + boolean hasDynamicPartitions, Set dynamicPartitionSpecs, boolean isDelete) throws HiveException { // When doing a multi-statement insert overwrite with dynamic partitioning, // the partition information will be written to the manifest file. @@ -4382,7 +4387,7 @@ public static void writeCommitManifest(List commitPaths, Path specPath, Fi return; } // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, isDelete); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4409,10 +4414,13 @@ public static void writeCommitManifest(List commitPaths, Path specPath, Fi } } - private static Path getManifestDir( - Path specPath, long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { - Path manifestPath = new Path(specPath, "_tmp." + - AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId)); + private static Path getManifestDir(Path specPath, long writeId, int stmtId, String unionSuffix, + boolean isInsertOverwrite, boolean isDelete) { + String deltaDir = AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId); + if (isDelete) { + deltaDir = AcidUtils.deleteDeltaSubdir(writeId, writeId, stmtId); + } + Path manifestPath = new Path(specPath, "_tmp." + deltaDir); if (isInsertOverwrite) { // When doing a multi-statement insert overwrite query with dynamic partitioning, the // generated manifest directory is the same for each FileSinkOperator. @@ -4436,14 +4444,14 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert) - throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert, + boolean isDelete) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, isDelete); if (!success) { - AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); + AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId, isDelete); tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - filter, writeId, stmtId, hconf); + filter, writeId, stmtId, hconf, isDelete); return; } @@ -4492,7 +4500,8 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId, dynamicPartitionSpecs, dpLevels); + AcidUtils.IdPathFilter filter = + new AcidUtils.IdPathFilter(writeId, stmtId, dynamicPartitionSpecs, dpLevels, isDelete); if (!fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating directory with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); @@ -4501,7 +4510,7 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS Path[] files = null; if (!isInsertOverwrite || dpLevels == 0 || !dynamicPartitionSpecs.isEmpty()) { files = getDirectInsertDirectoryCandidates( - fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); + fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite, isDelete); } ArrayList directInsertDirectories = new ArrayList<>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 034d2d3320..11d6af87f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -33,6 +34,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -113,19 +115,25 @@ private long minWriteId; private long maxWriteId; private List stmtIds; + private Map> statementAndBucketToAttemptId; /** * {@link AcidUtils#?} */ private long visibilityTxnId; public DeltaMetaData() { - this(0,0,new ArrayList(), 0); + this(0, 0, new ArrayList(), 0, new HashMap>()); + } + + public DeltaMetaData(long minWriteId, long maxWriteId, List stmtIds, long visibilityTxnId) { + this(minWriteId, maxWriteId, stmtIds, visibilityTxnId, new HashMap>()); } /** * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition * @param visibilityTxnId maybe 0, if the dir name didn't have it. txnid:0 is always visible */ - DeltaMetaData(long minWriteId, long maxWriteId, List stmtIds, long visibilityTxnId) { + DeltaMetaData(long minWriteId, long maxWriteId, List stmtIds, long visibilityTxnId, + Map> statementAndBucketToAttemptId) { this.minWriteId = minWriteId; this.maxWriteId = maxWriteId; if (stmtIds == null) { @@ -133,6 +141,7 @@ public DeltaMetaData() { } this.stmtIds = stmtIds; this.visibilityTxnId = visibilityTxnId; + this.statementAndBucketToAttemptId = statementAndBucketToAttemptId; } long getMinWriteId() { return minWriteId; @@ -146,6 +155,21 @@ long getMaxWriteId() { long getVisibilityTxnId() { return visibilityTxnId; } + + public String getAttemptId(int statementId, int bucketId) { + if (statementAndBucketToAttemptId == null) { + return null; + } + Map bucketToAttempId = + statementAndBucketToAttemptId.getOrDefault(statementId, new HashMap()); + return bucketToAttempId.get(bucketId); + } + + public void mergeDeltaMetaData(ParsedDelta parsedDelta) { + this.stmtIds.add(parsedDelta.getStatementId()); + this.statementAndBucketToAttemptId.put(parsedDelta.getStatementId(), parsedDelta.getBucketIdToAttemptId()); + } + @Override public void write(DataOutput out) throws IOException { out.writeLong(minWriteId); @@ -155,6 +179,17 @@ public void write(DataOutput out) throws IOException { out.writeInt(id); } out.writeLong(visibilityTxnId); + out.writeInt(statementAndBucketToAttemptId == null ? 0 : statementAndBucketToAttemptId.size()); + for (Map.Entry> statementAndBucketToAttemptIdEntry : statementAndBucketToAttemptId + .entrySet()) { + out.writeInt(statementAndBucketToAttemptIdEntry.getKey()); + Map bucketToAttemptId = statementAndBucketToAttemptIdEntry.getValue(); + out.writeInt(bucketToAttemptId == null ? 0 : bucketToAttemptId.size()); + for (Map.Entry bucketToAttemptIdEntry : bucketToAttemptId.entrySet()) { + String output = bucketToAttemptIdEntry.getKey() + "=" + bucketToAttemptIdEntry.getValue(); + out.writeUTF(output); + } + } } @Override public void readFields(DataInput in) throws IOException { @@ -166,6 +201,22 @@ public void readFields(DataInput in) throws IOException { stmtIds.add(in.readInt()); } visibilityTxnId = in.readLong(); + int statementAndBucketToAttemptIdSize = in.readInt(); + if (statementAndBucketToAttemptId == null) { + statementAndBucketToAttemptId = new HashMap>(); + } + for (int i = 0; i < statementAndBucketToAttemptIdSize; i++) { + int statementId = in.readInt(); + int bucketToAttemptIdSize = in.readInt(); + Map bucketToAttemptId = + statementAndBucketToAttemptId.getOrDefault(statementId, new HashMap()); + for (int j = 0; j < bucketToAttemptIdSize; j++) { + String bucketIdToAttemptIdEntry = in.readUTF(); + String[] bucketIdToAttemptIdSplit = bucketIdToAttemptIdEntry.split("="); + bucketToAttemptId.put(Integer.parseInt(bucketIdToAttemptIdSplit[0]), bucketIdToAttemptIdSplit[1]); + } + statementAndBucketToAttemptId.put(statementId, bucketToAttemptId); + } } String getName() { assert stmtIds.isEmpty() : "use getName(int)"; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 66404ab9b8..37f30fc8f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -549,6 +549,19 @@ else if (filename.startsWith(BUCKET_PREFIX)) { return result; } + public static Map getDeltaToAttemptIdMap( + Map pathToDeltaMetaData, Path[] deleteDeltaDirs, int bucket) { + Map deltaToAttemptId = new HashMap<>(); + for (Path delta : deleteDeltaDirs) { + AcidInputFormat.DeltaMetaData deltaMetaData = pathToDeltaMetaData.get(delta.getName()); + String[] deltaParts = delta.getName().split("_"); + int statementId = Integer.parseInt(deltaParts[deltaParts.length - 1]); + String attemptId = deltaMetaData.getAttemptId(statementId, bucket); + deltaToAttemptId.put(delta.getName(), attemptId); + } + return deltaToAttemptId; + } + public static final class DirectoryImpl implements Directory { private final List abortedDirectories; private final boolean isBaseInRawFormat; @@ -957,9 +970,15 @@ private ParsedDelta(long min, long max, Path path, boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId); } - private ParsedDelta(long min, long max, Path path, int statementId, - boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { - super(min, max, path, statementId, isDeleteDelta, visibilityTxnId); + + private ParsedDelta(long min, long max, Path path, int statementId, boolean isDeleteDelta, boolean isRawFormat, + long visibilityTxnId) { + this(min, max, path, statementId, isDeleteDelta, isRawFormat, visibilityTxnId, new HashMap()); + } + + private ParsedDelta(long min, long max, Path path, int statementId, boolean isDeleteDelta, boolean isRawFormat, + long visibilityTxnId, Map bucketIdToAttemptId) { + super(min, max, path, statementId, isDeleteDelta, visibilityTxnId, bucketIdToAttemptId); this.isRawFormat = isRawFormat; } /** @@ -982,6 +1001,7 @@ public boolean isRawFormat() { //had no statement ID final int statementId; final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + final Map bucketIdToAttemptId; /** * transaction Id of txn which created this delta. This dir should be considered * invisible unless this txn is committed @@ -1000,12 +1020,18 @@ public static ParsedDeltaLight parse(Path deltaDir) { private ParsedDeltaLight(long min, long max, Path path, int statementId, boolean isDeleteDelta, long visibilityTxnId) { + this(min, max, path, statementId, isDeleteDelta, visibilityTxnId, new HashMap()); + } + + private ParsedDeltaLight(long min, long max, Path path, int statementId, boolean isDeleteDelta, + long visibilityTxnId, Map bucketIdToAttemptId) { this.minWriteId = min; this.maxWriteId = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; this.visibilityTxnId = visibilityTxnId; + this.bucketIdToAttemptId = bucketIdToAttemptId; } public long getMinWriteId() { @@ -1030,6 +1056,11 @@ public boolean isDeleteDelta() { public long getVisibilityTxnId() { return visibilityTxnId; } + + public Map getBucketIdToAttemptId() { + return bucketIdToAttemptId; + } + /** * Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1) * may contain a {@link OrcAcidUtils#getSideFile(Path)}. @@ -1108,14 +1139,14 @@ else if(statementId != parsedDelta.statementId) { if ((last != null) && (last.getMinWriteId() == parsedDelta.getMinWriteId()) && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) { - last.getStmtIds().add(parsedDelta.getStatementId()); + last.mergeDeltaMetaData(parsedDelta); continue; } - last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(), - parsedDelta.getMaxWriteId(), new ArrayList<>(), parsedDelta.getVisibilityTxnId()); + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(), parsedDelta.getMaxWriteId(), + new ArrayList<>(), parsedDelta.getVisibilityTxnId()); result.add(last); if (parsedDelta.statementId >= 0) { - last.getStmtIds().add(parsedDelta.getStatementId()); + last.mergeDeltaMetaData(parsedDelta); } } return result; @@ -1130,15 +1161,24 @@ else if(statementId != parsedDelta.statementId) { * @param deleteDeltas list of begin/end write id pairs * @return the list of delta paths */ - public static Path[] deserializeDeleteDeltas(Path root, final List deleteDeltas) throws IOException { + public static Path[] deserializeDeleteDeltas(Path root, final List deleteDeltas, + Map pathToDeltaMetaData) throws IOException { List results = new ArrayList<>(deleteDeltas.size()); for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) { if(dmd.getStmtIds().isEmpty()) { - results.add(new Path(root, dmd.getName())); + Path delta = new Path(root, dmd.getName()); + results.add(delta); + if (pathToDeltaMetaData != null) { + pathToDeltaMetaData.put(delta.getName(), dmd); + } continue; } for(Integer stmtId : dmd.getStmtIds()) { - results.add(new Path(root, dmd.getName(stmtId))); + Path delta = new Path(root, dmd.getName(stmtId)); + results.add(delta); + if (pathToDeltaMetaData != null) { + pathToDeltaMetaData.put(delta.getName(), dmd); + } } } return results.toArray(new Path[results.size()]); @@ -1161,8 +1201,32 @@ private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem throws IOException { ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); - return new ParsedDelta(p.getMinWriteId(), - p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId); + Map bucketIdToAttemptId = new HashMap(); + if (isDeleteDelta && dirSnapshot != null) { + for (FileStatus fileStatus: dirSnapshot.getFiles()) { + int bucket = parseBucketId(fileStatus.getPath()); + String attemptId = parseAttemptId(fileStatus.getPath()); + bucketIdToAttemptId.put(bucket, attemptId); + } + } + return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), + p.visibilityTxnId, bucketIdToAttemptId); + } + + private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs, FileStatus[] files) + throws IOException { + ParsedDelta p = parsedDelta(path, deltaPrefix, fs, null); + boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + Map bucketIdToAttemptId = new HashMap(); + if (isDeleteDelta && files != null) { + for (FileStatus fileStatus : files) { + int bucket = parseBucketId(fileStatus.getPath()); + String attemptId = parseAttemptId(fileStatus.getPath()); + bucketIdToAttemptId.put(bucket, attemptId); + } + } + return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), + p.visibilityTxnId, bucketIdToAttemptId); } public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot) @@ -1764,7 +1828,14 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null); + FileStatus[] files = null; + if (fn.startsWith(DELETE_DELTA_PREFIX)) { + PathFilter f = (Path path) -> { + return !p.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT); + }; + files = fs.listStatus(child.getPath(), f); + } + ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, files); if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) { return; } @@ -2808,14 +2879,17 @@ public boolean accept(Path path) { private final Set dpSpecs; private final int dpLevel; - public IdPathFilter(long writeId, int stmtId) { - this(writeId, stmtId, null, 0); + public IdPathFilter(long writeId, int stmtId, boolean isDelete) { + this(writeId, stmtId, null, 0, isDelete); } - public IdPathFilter(long writeId, int stmtId, Set dpSpecs, int dpLevel) { - String deltaDirName = null; - deltaDirName = DELTA_PREFIX + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId); + public IdPathFilter(long writeId, int stmtId, Set dpSpecs, int dpLevel, boolean isDelete) { + String prefix = DELTA_PREFIX; + if (isDelete) { + prefix = DELETE_DELTA_PREFIX ; + } + String deltaDirName = prefix + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId); isDeltaPrefix = (stmtId < 0); if (!isDeltaPrefix) { deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 1059cb227f..6f138eb6b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2127,7 +2127,8 @@ public float getProgress() throws IOException { throw new IllegalStateException("Expected SpliUpdate table: " + split.getPath()); } - final Path[] deltas = VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit(split); + Map pathToDeltaMetaData = new HashMap<>(); + final Path[] deltas = VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit(split, pathToDeltaMetaData); final Configuration conf = options.getConfiguration(); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); @@ -2161,9 +2162,10 @@ public float getProgress() throws IOException { + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)); LOG.debug("Creating merger for {} and {}", split.getPath(), Arrays.toString(deltas)); } - final OrcRawRecordMerger records = - new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, - validWriteIdList, readOptions, deltas, mergerOptions); + + Map deltaToAttemptId = AcidUtils.getDeltaToAttemptIdMap(pathToDeltaMetaData, deltas, bucket); + final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, + validWriteIdList, readOptions, deltas, mergerOptions, deltaToAttemptId); return new RowReader() { OrcStruct innerRecord = records.createValue(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index f543418179..0ce50587df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1079,7 +1079,11 @@ public Options clone() { assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath(); //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf); - Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket); + String attemptId = null; + if (deltasToAttemptId != null) { + attemptId = deltasToAttemptId.get(mergerOptions.getBaseDir().getName()); + } + Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket, attemptId); if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) { //doing major compaction - it's possible where full compliment of bucket files is not //required (on Tez) that base_x/ doesn't have a file for 'bucket' @@ -1144,7 +1148,7 @@ public Options clone() { String attemptId = null; if (deltasToAttemptId != null) { - attemptId = deltasToAttemptId.get(delta.toString()); + attemptId = deltasToAttemptId.get(delta.getName()); } for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions, attemptId)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 4b339a6c58..f92d202f05 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -814,6 +814,9 @@ private int setBucket(int bucketProperty, int operation) { @Override public Path getUpdatedFilePath() { + if (operation.get() == DELETE_OPERATION) { + return deleteEventPath; + } return path; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 598220b0c4..6b8e210f5a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; @@ -60,7 +61,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; /** @@ -396,7 +399,7 @@ public void setBaseAndInnerReader( private OrcRawRecordMerger.KeyInterval findMinMaxKeys( OrcSplit orcSplit, Configuration conf, Reader.Options deleteEventReaderOptions) throws IOException { - final boolean noDeleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit).length == 0; + final boolean noDeleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit, null).length == 0; if(!HiveConf.getBoolVar(conf, ConfVars.FILTER_DELETE_EVENTS) || noDeleteDeltas) { LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false"); return new OrcRawRecordMerger.KeyInterval(null, null); @@ -784,7 +787,9 @@ private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { } return false; } - static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + + static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit, + Map pathToDeltaMetaData) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { @@ -798,7 +803,7 @@ private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { } else { throw new IllegalStateException("Split w/o base w/Acid 2.0??: " + path); } - return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); + return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas(), pathToDeltaMetaData); } /** @@ -1123,18 +1128,19 @@ DeleteEventRegistry getDeleteEventRegistry() { SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException { - final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); + Map pathToDeltaMetaData = new HashMap<>(); + final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit, pathToDeltaMetaData); if (deleteDeltas.length > 0) { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); LOG.debug("Using SortMergedDeleteEventRegistry"); + Map deltaToAttemptId = AcidUtils.getDeltaToAttemptIdMap(pathToDeltaMetaData, deleteDeltas, bucket); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true); assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; - this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, - validWriteIdList, readerOptions, deleteDeltas, - mergerOptions, null); + this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, validWriteIdList, readerOptions, + deleteDeltas, mergerOptions, deltaToAttemptId); this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); this.deleteRecordValue = this.deleteRecords.createValue(); // Initialize the first value in the delete reader. @@ -1559,13 +1565,15 @@ public int compareTo(CompressedOwid other) { this.orcSplit = orcSplit; try { - final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); + Map pathToDeltaMetaData = new HashMap<>(); + final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit, pathToDeltaMetaData); if (deleteDeltaDirs.length > 0) { + Map deltaToAttemptId = AcidUtils.getDeltaToAttemptIdMap(pathToDeltaMetaData, deleteDeltaDirs, bucket); int totalDeleteEventCount = 0; for (Path deleteDeltaDir : deleteDeltaDirs) { FileSystem fs = deleteDeltaDir.getFileSystem(conf); Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, - new OrcRawRecordMerger.Options().isCompacting(false), null); + new OrcRawRecordMerger.Options().isCompacting(false), deltaToAttemptId.get(deleteDeltaDir.getName())); for (Path deleteDeltaFile : deleteDeltaFiles) { // NOTE: Calling last flush length below is more for future-proofing when we have // streaming deletes. But currently we don't support streaming deletes, and this can diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f6a5207ddb..4728ffc0bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2797,7 +2797,7 @@ private void constructOneLBLocationMap(FileStatus fSta, Utilities.FILE_OP_LOGGER.trace( "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP); Path[] leafStatus = Utilities.getDirectInsertDirectoryCandidates( - fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite); + fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite, false); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 1ea3bd357f..992521116b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1993,7 +1993,7 @@ public static Path createMoveTask(Task currTask, boolean chDir, * 1. MM Tables * 2. INSERT operation on full ACID table */ - if ((!isMmTable) && (!isDirectInsert)) { + if (!isMmTable && !isDirectInsert) { // generate the temporary file // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index aa8d84ec9c..6a9b40b99b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7191,6 +7191,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean destTableIsTransactional; // true for full ACID table and MM table boolean destTableIsFullAcid; // should the destination table be written to using ACID boolean isDirectInsert = false; // should we add files directly to the final path + AcidUtils.Operation acidOperation = null; boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition destinationPartition = null;// destination partition if any @@ -7260,6 +7261,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } } isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp); + acidOperation = acidOp; queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir @@ -7409,6 +7411,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } } isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp); + acidOperation = acidOp; queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying " @@ -7795,7 +7798,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition, destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, destinationTable, writeId, isMmCreate, destType, qb, isDirectInsert); + canBeMerged, destinationTable, writeId, isMmCreate, destType, qb, isDirectInsert, acidOperation); if (isMmCreate) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. if (tableDesc != null) { @@ -7853,7 +7856,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) private boolean isDirectInsert(boolean destTableIsFullAcid, AcidUtils.Operation acidOp) { boolean directInsertEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_ACID_DIRECT_INSERT_ENABLED); - boolean directInsert = directInsertEnabled && destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT); + boolean directInsert = directInsertEnabled && destTableIsFullAcid && acidOp != AcidUtils.Operation.NOT_ACID; if (LOG.isDebugEnabled() && directInsert) { LOG.debug("Direct insert for ACID tables is enabled."); } @@ -8029,7 +8032,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, - Integer dest_type, QB qb, boolean isDirectInsert) throws SemanticException { + Integer dest_type, QB qb, boolean isDirectInsert, AcidUtils.Operation acidOperation) throws SemanticException { boolean isInsertOverwrite = false; switch (dest_type) { case QBMetaData.DEST_PARTITION: @@ -8054,7 +8057,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(), - qb.isCTAS() || qb.isMaterializedView(), isDirectInsert); + qb.isCTAS() || qb.isMaterializedView(), isDirectInsert, acidOperation); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 619f68e22d..3e993b2cd4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -114,6 +114,8 @@ private boolean isDirectInsert = false; + private AcidUtils.Operation acidOperation = null; + private boolean isQuery = false; private boolean isCTASorCM = false; @@ -127,7 +129,8 @@ public FileSinkDesc() { public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final List partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, Long mmWriteId, - boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM, boolean isDirectInsert) { + boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM, boolean isDirectInsert, + AcidUtils.Operation acidOperation) { this.dirName = dirName; setTableInfo(tableInfo); this.compressed = compressed; @@ -146,6 +149,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean this.isQuery = isQuery; this.isCTASorCM = isCTASorCM; this.isDirectInsert = isDirectInsert; + this.acidOperation = acidOperation; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -167,7 +171,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, - isCTASorCM, isDirectInsert); + isCTASorCM, isDirectInsert, acidOperation); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -186,6 +190,7 @@ public Object clone() throws CloneNotSupportedException { ret.setIsQuery(isQuery); ret.setIsCTASorCM(isCTASorCM); ret.setIsDirectInsert(isDirectInsert); + ret.setAcidOperation(acidOperation); return ret; } @@ -233,6 +238,14 @@ public boolean isDirectInsert() { return this.isDirectInsert; } + public void setAcidOperation(AcidUtils.Operation acidOperation) { + this.acidOperation = acidOperation; + } + + public AcidUtils.Operation getAcidOperation() { + return acidOperation; + } + @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) public Path getDirName() { return dirName; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index a62b3cc850..3ecdce0f5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -85,6 +85,7 @@ public LoadTableDesc(final LoadTableDesc o) { this.currentWriteId = o.currentWriteId; this.table = o.table; this.partitionSpec = o.partitionSpec; + this.isDirectInsert = o.isDirectInsert; } public LoadTableDesc(final Path sourcePath, diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f26d4bb2c3..dae95d7bfd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -467,12 +467,19 @@ public void write(DataOutput dataOutput) throws IOException { } else { dataOutput.writeInt(base.toString().length()); dataOutput.writeBytes(base.toString()); + String attemptId = deltasToAttemptId.get(base.getName()); + if (attemptId == null) { + dataOutput.writeInt(0); + } else { + dataOutput.writeInt(attemptId.length()); + dataOutput.writeBytes(attemptId.toString()); + } } dataOutput.writeInt(deltas.length); for (int i = 0; i < deltas.length; i++) { dataOutput.writeInt(deltas[i].toString().length()); dataOutput.writeBytes(deltas[i].toString()); - String attemptId = deltasToAttemptId.get(deltas[i].toString()); + String attemptId = deltasToAttemptId.get(deltas[i].getName()); if (attemptId == null) { dataOutput.writeInt(0); } else { @@ -504,14 +511,21 @@ public void readFields(DataInput dataInput) throws IOException { LOG.debug("Read bucket number of " + bucketNum); len = dataInput.readInt(); LOG.debug("Read base path length of " + len); + String baseAttemptId = null; if (len > 0) { buf = new byte[len]; dataInput.readFully(buf); base = new Path(new String(buf)); + len = dataInput.readInt(); + if (len > 0) { + buf = new byte[len]; + dataInput.readFully(buf); + baseAttemptId = new String(buf); + } } numElements = dataInput.readInt(); - deltas = new Path[numElements]; deltasToAttemptId = new HashMap<>(); + deltas = new Path[numElements]; for (int i = 0; i < numElements; i++) { len = dataInput.readInt(); buf = new byte[len]; @@ -524,7 +538,10 @@ public void readFields(DataInput dataInput) throws IOException { dataInput.readFully(buf); attemptId = new String(buf); } - deltasToAttemptId.put(deltas[i].toString(), attemptId); + deltasToAttemptId.put(deltas[i].getName(), attemptId); + if (baseAttemptId != null) { + deltasToAttemptId.put(base.getName(), baseAttemptId); + } } } @@ -683,7 +700,7 @@ private void addFileToMap(Matcher matcher, Path file, boolean sawBase, LOG.debug("Adding " + file.toString() + " to list of files for splits"); bt.buckets.add(file); bt.sawBase |= sawBase; - bt.deltasToAttemptId.put(file.getParent().toString(), attemptId); + bt.deltasToAttemptId.put(file.getParent().getName(), attemptId); } private static class BucketTracker { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index ebb51c447b..4689398910 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false, false, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false, false, false, writeType); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java index dc281d2c59..1974b4492f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,16 +40,16 @@ @Mock private DataInput mockDataInput; - +//TODO: ide irjunk tesztet!!! @Test public void testDeltaMetaDataReadFieldsNoStatementIds() throws Exception { when(mockDataInput.readLong()).thenReturn(1L, 2L); - when(mockDataInput.readInt()).thenReturn(0); + when(mockDataInput.readInt()).thenReturn(0, 0); DeltaMetaData deltaMetaData = new AcidInputFormat.DeltaMetaData(); deltaMetaData.readFields(mockDataInput); - verify(mockDataInput, times(1)).readInt(); + verify(mockDataInput, times(2)).readInt(); assertThat(deltaMetaData.getMinWriteId(), is(1L)); assertThat(deltaMetaData.getMaxWriteId(), is(2L)); assertThat(deltaMetaData.getStmtIds().isEmpty(), is(true)); @@ -57,12 +58,12 @@ public void testDeltaMetaDataReadFieldsNoStatementIds() throws Exception { @Test public void testDeltaMetaDataReadFieldsWithStatementIds() throws Exception { when(mockDataInput.readLong()).thenReturn(1L, 2L); - when(mockDataInput.readInt()).thenReturn(2, 100, 101); + when(mockDataInput.readInt()).thenReturn(2, 100, 101, 0); DeltaMetaData deltaMetaData = new AcidInputFormat.DeltaMetaData(); deltaMetaData.readFields(mockDataInput); - verify(mockDataInput, times(3)).readInt(); + verify(mockDataInput, times(4)).readInt(); assertThat(deltaMetaData.getMinWriteId(), is(1L)); assertThat(deltaMetaData.getMaxWriteId(), is(2L)); assertThat(deltaMetaData.getStmtIds().size(), is(2)); @@ -73,7 +74,7 @@ public void testDeltaMetaDataReadFieldsWithStatementIds() throws Exception { @Test public void testDeltaMetaConstructWithState() throws Exception { DeltaMetaData deltaMetaData = new AcidInputFormat - .DeltaMetaData(2000L, 2001L, Arrays.asList(97, 98, 99), 0); + .DeltaMetaData(2000L, 2001L, Arrays.asList(97, 98, 99), 0, null); assertThat(deltaMetaData.getMinWriteId(), is(2000L)); assertThat(deltaMetaData.getMaxWriteId(), is(2001L)); @@ -86,17 +87,17 @@ public void testDeltaMetaConstructWithState() throws Exception { @Test public void testDeltaMetaDataReadFieldsWithStatementIdsResetsState() throws Exception { when(mockDataInput.readLong()).thenReturn(1L, 2L); - when(mockDataInput.readInt()).thenReturn(2, 100, 101); + when(mockDataInput.readInt()).thenReturn(2, 100, 101, 0); List statementIds = new ArrayList<>(); statementIds.add(97); statementIds.add(98); statementIds.add(99); DeltaMetaData deltaMetaData = new AcidInputFormat - .DeltaMetaData(2000L, 2001L, statementIds, 0); + .DeltaMetaData(2000L, 2001L, statementIds, 0, null); deltaMetaData.readFields(mockDataInput); - verify(mockDataInput, times(3)).readInt(); + verify(mockDataInput, times(4)).readInt(); assertThat(deltaMetaData.getMinWriteId(), is(1L)); assertThat(deltaMetaData.getMaxWriteId(), is(2L)); assertThat(deltaMetaData.getStmtIds().size(), is(2)); @@ -104,4 +105,72 @@ public void testDeltaMetaDataReadFieldsWithStatementIdsResetsState() throws Exce assertThat(deltaMetaData.getStmtIds().get(1), is(101)); } + @Test + public void testDeltaMetaDataReadFieldsWithBucketToAttemptIdOneStmt() throws Exception { + when(mockDataInput.readLong()).thenReturn(1L, 2L, 22L); + when(mockDataInput.readInt()).thenReturn(1, 0, 1, 0, 3); + when(mockDataInput.readUTF()).thenReturn("0=1", "1=0", "2=3"); + + List statementIds = new ArrayList<>(); + statementIds.add(1); + DeltaMetaData deltaMetaData = new AcidInputFormat + .DeltaMetaData(2000L, 2001L, statementIds, 0, null); + deltaMetaData.readFields(mockDataInput); + + assertThat(deltaMetaData.getMinWriteId(), is(1L)); + assertThat(deltaMetaData.getMaxWriteId(), is(2L)); + assertThat(deltaMetaData.getStmtIds().size(), is(1)); + assertThat(deltaMetaData.getStmtIds().get(0), is(0)); + assertThat(deltaMetaData.getAttemptId(0, 0), is("1")); + assertThat(deltaMetaData.getAttemptId(0, 1), is("0")); + assertThat(deltaMetaData.getAttemptId(0, 2), is("3")); + } + + @Test + public void testDeltaMetaDataReadFieldsWithBucketToAttemptIdMultipleStmt() throws Exception { + when(mockDataInput.readLong()).thenReturn(1L, 2L, 44L); + when(mockDataInput.readInt()).thenReturn(3, 31, 32, 33, 3, 31, 2, 32, 2, 33, 2); + when(mockDataInput.readUTF()).thenReturn("1=1", "2=2", "1=3", "2=4", "1=5", "2=6"); + + List statementIds = new ArrayList<>(); + statementIds.add(1); + DeltaMetaData deltaMetaData = new AcidInputFormat + .DeltaMetaData(2000L, 2001L, statementIds, 0, null); + deltaMetaData.readFields(mockDataInput); + + assertThat(deltaMetaData.getMinWriteId(), is(1L)); + assertThat(deltaMetaData.getMaxWriteId(), is(2L)); + assertThat(deltaMetaData.getStmtIds().size(), is(3)); + assertThat(deltaMetaData.getStmtIds().get(0), is(31)); + assertThat(deltaMetaData.getStmtIds().get(1), is(32)); + assertThat(deltaMetaData.getStmtIds().get(2), is(33)); + assertThat(deltaMetaData.getAttemptId(31, 1), is("1")); + assertThat(deltaMetaData.getAttemptId(31, 2), is("2")); + assertThat(deltaMetaData.getAttemptId(32, 1), is("3")); + assertThat(deltaMetaData.getAttemptId(32, 2), is("4")); + assertThat(deltaMetaData.getAttemptId(33, 1), is("5")); + assertThat(deltaMetaData.getAttemptId(33, 2), is("6")); + } + + @Test + public void testDeltaMetaDataReadFieldsWithNoBucketToAttemptId() throws Exception { + when(mockDataInput.readLong()).thenReturn(1L, 2L, 44L); + when(mockDataInput.readInt()).thenReturn(3, 31, 32, 33, 0); + + List statementIds = new ArrayList<>(); + statementIds.add(1); + DeltaMetaData deltaMetaData = new AcidInputFormat + .DeltaMetaData(2000L, 2001L, statementIds, 0, null); + deltaMetaData.readFields(mockDataInput); + + assertThat(deltaMetaData.getMinWriteId(), is(1L)); + assertThat(deltaMetaData.getMaxWriteId(), is(2L)); + assertThat(deltaMetaData.getStmtIds().size(), is(3)); + assertThat(deltaMetaData.getStmtIds().get(0), is(31)); + assertThat(deltaMetaData.getStmtIds().get(1), is(32)); + assertThat(deltaMetaData.getStmtIds().get(2), is(33)); + assertThat(deltaMetaData.getAttemptId(31, 1), nullValue()); + assertThat(deltaMetaData.getAttemptId(32, 1), nullValue()); + assertThat(deltaMetaData.getAttemptId(33, 1), nullValue()); + } } diff --git ql/src/test/queries/clientpositive/acid_direct_update_delete.q ql/src/test/queries/clientpositive/acid_direct_update_delete.q new file mode 100644 index 0000000000..f232389edf --- /dev/null +++ ql/src/test/queries/clientpositive/acid_direct_update_delete.q @@ -0,0 +1,19 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.acid.direct.insert.enabled=true; + +DROP TABLE IF EXISTS test_update_bucketed; + +CREATE TABLE test_update_bucketed(id string, value string) CLUSTERED BY(id) INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true'); + +INSERT INTO test_update_bucketed values ('1','one'),('2','two'),('3','three'),('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty'); + +SELECT * FROM test_update_bucketed ORDER BY id; +DELETE FROM test_update_bucketed WHERE id IN ('2', '4', '12', '15'); +UPDATE test_update_bucketed SET value='New value' WHERE id IN ('6','11', '18', '20'); +SELECT * FROM test_update_bucketed ORDER BY ID; +DELETE FROM test_update_bucketed WHERE id IN ('2', '11', '10'); +UPDATE test_update_bucketed SET value='New value2' WHERE id IN ('2','18', '19'); +SELECT * FROM test_update_bucketed ORDER BY ID; + +DROP TABLE IF EXISTS test_update_bucketed; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/acid_direct_update_delete_partitions.q ql/src/test/queries/clientpositive/acid_direct_update_delete_partitions.q new file mode 100644 index 0000000000..5b558e3ce7 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_direct_update_delete_partitions.q @@ -0,0 +1,41 @@ +SET hive.support.concurrency=true; +SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +SET hive.acid.direct.insert.enabled=true; + +DROP TABLE IF EXISTS test_update_part_text; +DROP TABLE IF EXISTS test_update_part; +DROP TABLE IF EXISTS test_delete_part; + +CREATE EXTERNAL TABLE test_update_part_text (a int, b int, c int) STORED AS TEXTFILE; +INSERT INTO test_update_part_text VALUES (11, 1, 11), (12, 2, 11), (13, 3, 11), (14, 4, 11), (14, 5, NULL), (15, 6, NULL), (16, 7, NULL), (14, 8, 22), (17, 8, 22), (18, 9, 22), (19, 10, 33), (20, 11, 33), (21, 12, 33); + +CREATE TABLE test_update_part (a int, b int) PARTITIONED BY (c int) CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true'); +INSERT OVERWRITE TABLE test_update_part SELECT a, b, c FROM test_update_part_text; + +SELECT * FROM test_update_part ORDER BY c, b, a; + +UPDATE test_update_part SET b=1111 WHERE b=8; +UPDATE test_update_part SET b=2222 WHERE a=8; +UPDATE test_update_part SET b=3333 WHERE a=14; +UPDATE test_update_part SET b=4444 WHERE c is null; +UPDATE test_update_part SET b=5555 WHERE c=33; +UPDATE test_update_part SET b=6666 WHERE a IN (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=33); +SELECT * FROM test_update_part ORDER BY c, b, a; + +CREATE TABLE test_delete_part (a int, b int) PARTITIONED BY (c int) STORED AS ORC TBLPROPERTIES('transactional'='true'); +INSERT OVERWRITE TABLE test_delete_part SELECT a, b, c FROM test_update_part_text; + +SELECT * FROM test_delete_part order by c, b, a; + +DELETE FROM test_delete_part WHERE b=8; +DELETE FROM test_delete_part WHERE a=8; +DELETE FROM test_delete_part WHERE b=8; +DELETE FROM test_delete_part WHERE a=14; +DELETE FROM test_delete_part WHERE c is null; +DELETE FROM test_delete_part WHERE c=33; +DELETE FROM test_delete_part WHERE a in (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=22); +SELECT * FROM test_delete_part order by c, b, a; + +DROP TABLE IF EXISTS test_update_part_text; +DROP TABLE IF EXISTS test_update_part; +DROP TABLE IF EXISTS test_delete_part; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/acid_direct_update_delete_with_merge.q ql/src/test/queries/clientpositive/acid_direct_update_delete_with_merge.q new file mode 100644 index 0000000000..82b8ab83f7 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_direct_update_delete_with_merge.q @@ -0,0 +1,41 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.acid.direct.insert.enabled=true; + +DROP TABLE IF EXISTS transactions; +DROP TABLE IF EXISTS merge_source; + +CREATE TABLE transactions (id int, value string, last_update_user string) PARTITIONED BY (tran_date string) CLUSTERED BY (id) into 5 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true'); +CREATE TABLE merge_source (id int, value string, tran_date string) STORED AS ORC; + +INSERT INTO transactions PARTITION (tran_date) VALUES +(1, 'value_01', 'creation', '20170410'), +(2, 'value_02', 'creation', '20170410'), +(3, 'value_03', 'creation', '20170410'), +(4, 'value_04', 'creation', '20170410'), +(5, 'value_05', 'creation', '20170413'), +(6, 'value_06', 'creation', '20170413'), +(7, 'value_07', 'creation', '20170413'), +(8, 'value_08', 'creation', '20170413'), +(9, 'value_09', 'creation', '20170413'), +(10, 'value_10','creation', '20170413'); + +INSERT INTO merge_source VALUES +(1, 'value_01', '20170410'), +(4, NULL, '20170410'), +(7, 'value_77777', '20170413'), +(8, NULL, '20170413'), +(8, 'value_08', '20170415'), +(11, 'value_11', '20170415'); + +MERGE INTO transactions AS T +USING merge_source AS S +ON T.ID = S.ID and T.tran_date = S.tran_date +WHEN MATCHED AND (T.value != S.value AND S.value IS NOT NULL) THEN UPDATE SET value = S.value, last_update_user = 'merge_update' +WHEN MATCHED AND S.value IS NULL THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.value, 'merge_insert', S.tran_date); + +SELECT * FROM transactions ORDER BY ID; + +DROP TABLE IF EXISTS transactions; +DROP TABLE IF EXISTS merge_source; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/update_orig_table.q ql/src/test/queries/clientpositive/update_orig_table.q index 750b034e10..3941bb3778 100644 --- ql/src/test/queries/clientpositive/update_orig_table.q +++ ql/src/test/queries/clientpositive/update_orig_table.q @@ -22,8 +22,12 @@ create table acid_uot( cboolean1 BOOLEAN, cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc location '${system:test.tmp.dir}/update_orig_table' TBLPROPERTIES ('transactional'='true'); +select * from acid_uot where cint < -1070551679; + update acid_uot set cstring1 = 'fred' where cint < -1070551679; select * from acid_uot where cstring1 = 'fred'; +select * from acid_uot where cint < -1070551679; + dfs -rmr ${system:test.tmp.dir}/update_orig_table; diff --git ql/src/test/results/clientpositive/llap/acid_direct_update_delete.q.out ql/src/test/results/clientpositive/llap/acid_direct_update_delete.q.out new file mode 100644 index 0000000000..eaafd8a3c7 --- /dev/null +++ ql/src/test/results/clientpositive/llap/acid_direct_update_delete.q.out @@ -0,0 +1,136 @@ +PREHOOK: query: DROP TABLE IF EXISTS test_update_bucketed +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS test_update_bucketed +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE test_update_bucketed(id string, value string) CLUSTERED BY(id) INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: CREATE TABLE test_update_bucketed(id string, value string) CLUSTERED BY(id) INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_update_bucketed +PREHOOK: query: INSERT INTO test_update_bucketed values ('1','one'),('2','two'),('3','three'),('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: INSERT INTO test_update_bucketed values ('1','one'),('2','two'),('3','three'),('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_update_bucketed +POSTHOOK: Lineage: test_update_bucketed.id SCRIPT [] +POSTHOOK: Lineage: test_update_bucketed.value SCRIPT [] +PREHOOK: query: SELECT * FROM test_update_bucketed ORDER BY id +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_update_bucketed ORDER BY id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 one +10 ten +11 eleven +12 twelve +13 thirteen +14 fourteen +15 fifteen +16 sixteen +17 seventeen +18 eighteen +19 nineteen +2 two +20 twenty +3 three +4 four +5 five +6 six +7 seven +8 eight +9 nine +PREHOOK: query: DELETE FROM test_update_bucketed WHERE id IN ('2', '4', '12', '15') +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: DELETE FROM test_update_bucketed WHERE id IN ('2', '4', '12', '15') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: default@test_update_bucketed +PREHOOK: query: UPDATE test_update_bucketed SET value='New value' WHERE id IN ('6','11', '18', '20') +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: UPDATE test_update_bucketed SET value='New value' WHERE id IN ('6','11', '18', '20') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: default@test_update_bucketed +PREHOOK: query: SELECT * FROM test_update_bucketed ORDER BY ID +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_update_bucketed ORDER BY ID +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 one +10 ten +11 New value +13 thirteen +14 fourteen +16 sixteen +17 seventeen +18 New value +19 nineteen +20 New value +3 three +5 five +6 New value +7 seven +8 eight +9 nine +PREHOOK: query: DELETE FROM test_update_bucketed WHERE id IN ('2', '11', '10') +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: DELETE FROM test_update_bucketed WHERE id IN ('2', '11', '10') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: default@test_update_bucketed +PREHOOK: query: UPDATE test_update_bucketed SET value='New value2' WHERE id IN ('2','18', '19') +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: UPDATE test_update_bucketed SET value='New value2' WHERE id IN ('2','18', '19') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: default@test_update_bucketed +PREHOOK: query: SELECT * FROM test_update_bucketed ORDER BY ID +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_update_bucketed ORDER BY ID +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 one +13 thirteen +14 fourteen +16 sixteen +17 seventeen +18 New value2 +19 New value2 +20 New value +3 three +5 five +6 New value +7 seven +8 eight +9 nine +PREHOOK: query: DROP TABLE IF EXISTS test_update_bucketed +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_update_bucketed +PREHOOK: Output: default@test_update_bucketed +POSTHOOK: query: DROP TABLE IF EXISTS test_update_bucketed +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_update_bucketed +POSTHOOK: Output: default@test_update_bucketed diff --git ql/src/test/results/clientpositive/llap/acid_direct_update_delete_partitions.q.out ql/src/test/results/clientpositive/llap/acid_direct_update_delete_partitions.q.out new file mode 100644 index 0000000000..42f3168a04 --- /dev/null +++ ql/src/test/results/clientpositive/llap/acid_direct_update_delete_partitions.q.out @@ -0,0 +1,468 @@ +PREHOOK: query: DROP TABLE IF EXISTS test_update_part_text +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS test_update_part_text +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS test_update_part +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS test_update_part +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS test_delete_part +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS test_delete_part +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE test_update_part_text (a int, b int, c int) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_update_part_text +POSTHOOK: query: CREATE EXTERNAL TABLE test_update_part_text (a int, b int, c int) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_update_part_text +PREHOOK: query: INSERT INTO test_update_part_text VALUES (11, 1, 11), (12, 2, 11), (13, 3, 11), (14, 4, 11), (14, 5, NULL), (15, 6, NULL), (16, 7, NULL), (14, 8, 22), (17, 8, 22), (18, 9, 22), (19, 10, 33), (20, 11, 33), (21, 12, 33) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_update_part_text +POSTHOOK: query: INSERT INTO test_update_part_text VALUES (11, 1, 11), (12, 2, 11), (13, 3, 11), (14, 4, 11), (14, 5, NULL), (15, 6, NULL), (16, 7, NULL), (14, 8, 22), (17, 8, 22), (18, 9, 22), (19, 10, 33), (20, 11, 33), (21, 12, 33) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_update_part_text +POSTHOOK: Lineage: test_update_part_text.a SCRIPT [] +POSTHOOK: Lineage: test_update_part_text.b SCRIPT [] +POSTHOOK: Lineage: test_update_part_text.c SCRIPT [] +PREHOOK: query: CREATE TABLE test_update_part (a int, b int) PARTITIONED BY (c int) CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_update_part +POSTHOOK: query: CREATE TABLE test_update_part (a int, b int) PARTITIONED BY (c int) CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_update_part +PREHOOK: query: INSERT OVERWRITE TABLE test_update_part SELECT a, b, c FROM test_update_part_text +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part_text +PREHOOK: Output: default@test_update_part +POSTHOOK: query: INSERT OVERWRITE TABLE test_update_part SELECT a, b, c FROM test_update_part_text +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part_text +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Lineage: test_update_part PARTITION(c=11).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=11).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=22).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=22).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=33).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=33).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=__HIVE_DEFAULT_PARTITION__).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_update_part PARTITION(c=__HIVE_DEFAULT_PARTITION__).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +PREHOOK: query: SELECT * FROM test_update_part ORDER BY c, b, a +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_update_part ORDER BY c, b, a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 1 11 +12 2 11 +13 3 11 +14 4 11 +14 8 22 +17 8 22 +18 9 22 +19 10 33 +20 11 33 +21 12 33 +14 5 NULL +15 6 NULL +16 7 NULL +PREHOOK: query: UPDATE test_update_part SET b=1111 WHERE b=8 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_update_part@c=11 +PREHOOK: Output: default@test_update_part@c=22 +PREHOOK: Output: default@test_update_part@c=33 +PREHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: UPDATE test_update_part SET b=1111 WHERE b=8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: UPDATE test_update_part SET b=2222 WHERE a=8 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_update_part@c=11 +PREHOOK: Output: default@test_update_part@c=22 +PREHOOK: Output: default@test_update_part@c=33 +PREHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: UPDATE test_update_part SET b=2222 WHERE a=8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: UPDATE test_update_part SET b=3333 WHERE a=14 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_update_part@c=11 +PREHOOK: Output: default@test_update_part@c=22 +PREHOOK: Output: default@test_update_part@c=33 +PREHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: UPDATE test_update_part SET b=3333 WHERE a=14 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: UPDATE test_update_part SET b=4444 WHERE c is null +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: UPDATE test_update_part SET b=4444 WHERE c is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: UPDATE test_update_part SET b=5555 WHERE c=33 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Output: default@test_update_part@c=33 +POSTHOOK: query: UPDATE test_update_part SET b=5555 WHERE c=33 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: UPDATE test_update_part SET b=6666 WHERE a IN (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=33) +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Input: default@test_update_part_text +PREHOOK: Output: default@test_update_part@c=11 +PREHOOK: Output: default@test_update_part@c=22 +PREHOOK: Output: default@test_update_part@c=33 +PREHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: UPDATE test_update_part SET b=6666 WHERE a IN (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=33) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Input: default@test_update_part_text +POSTHOOK: Output: default@test_update_part@c=11 +POSTHOOK: Output: default@test_update_part@c=22 +POSTHOOK: Output: default@test_update_part@c=33 +POSTHOOK: Output: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: SELECT * FROM test_update_part ORDER BY c, b, a +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part +PREHOOK: Input: default@test_update_part@c=11 +PREHOOK: Input: default@test_update_part@c=22 +PREHOOK: Input: default@test_update_part@c=33 +PREHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_update_part ORDER BY c, b, a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part +POSTHOOK: Input: default@test_update_part@c=11 +POSTHOOK: Input: default@test_update_part@c=22 +POSTHOOK: Input: default@test_update_part@c=33 +POSTHOOK: Input: default@test_update_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 1 11 +13 3 11 +14 3333 11 +12 6666 11 +18 9 22 +17 1111 22 +14 3333 22 +19 6666 33 +20 6666 33 +21 6666 33 +14 4444 NULL +15 4444 NULL +16 4444 NULL +PREHOOK: query: CREATE TABLE test_delete_part (a int, b int) PARTITIONED BY (c int) STORED AS ORC TBLPROPERTIES('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_delete_part +POSTHOOK: query: CREATE TABLE test_delete_part (a int, b int) PARTITIONED BY (c int) STORED AS ORC TBLPROPERTIES('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_delete_part +PREHOOK: query: INSERT OVERWRITE TABLE test_delete_part SELECT a, b, c FROM test_update_part_text +PREHOOK: type: QUERY +PREHOOK: Input: default@test_update_part_text +PREHOOK: Output: default@test_delete_part +POSTHOOK: query: INSERT OVERWRITE TABLE test_delete_part SELECT a, b, c FROM test_update_part_text +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_update_part_text +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Lineage: test_delete_part PARTITION(c=11).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=11).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=22).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=22).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=33).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=33).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=__HIVE_DEFAULT_PARTITION__).a SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: test_delete_part PARTITION(c=__HIVE_DEFAULT_PARTITION__).b SIMPLE [(test_update_part_text)test_update_part_text.FieldSchema(name:b, type:int, comment:null), ] +PREHOOK: query: SELECT * FROM test_delete_part order by c, b, a +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_delete_part order by c, b, a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 1 11 +12 2 11 +13 3 11 +14 4 11 +14 8 22 +17 8 22 +18 9 22 +19 10 33 +20 11 33 +21 12 33 +14 5 NULL +15 6 NULL +16 7 NULL +PREHOOK: query: DELETE FROM test_delete_part WHERE b=8 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_delete_part@c=11 +PREHOOK: Output: default@test_delete_part@c=22 +PREHOOK: Output: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE b=8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE a=8 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_delete_part@c=11 +PREHOOK: Output: default@test_delete_part@c=22 +PREHOOK: Output: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE a=8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE b=8 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_delete_part@c=11 +PREHOOK: Output: default@test_delete_part@c=22 +PREHOOK: Output: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE b=8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE a=14 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_delete_part@c=11 +PREHOOK: Output: default@test_delete_part@c=22 +PREHOOK: Output: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE a=14 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE c is null +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE c is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE c=33 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: query: DELETE FROM test_delete_part WHERE c=33 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: DELETE FROM test_delete_part WHERE a in (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=22) +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Input: default@test_update_part_text +PREHOOK: Output: default@test_delete_part@c=11 +PREHOOK: Output: default@test_delete_part@c=22 +PREHOOK: Output: default@test_delete_part@c=33 +PREHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: query: DELETE FROM test_delete_part WHERE a in (SELECT a FROM test_update_part_text WHERE (c=11 and b=2) or c=22) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Input: default@test_update_part_text +POSTHOOK: Output: default@test_delete_part@c=11 +POSTHOOK: Output: default@test_delete_part@c=22 +POSTHOOK: Output: default@test_delete_part@c=33 +POSTHOOK: Output: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: query: SELECT * FROM test_delete_part order by c, b, a +PREHOOK: type: QUERY +PREHOOK: Input: default@test_delete_part +PREHOOK: Input: default@test_delete_part@c=11 +PREHOOK: Input: default@test_delete_part@c=22 +PREHOOK: Input: default@test_delete_part@c=33 +PREHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM test_delete_part order by c, b, a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Input: default@test_delete_part@c=11 +POSTHOOK: Input: default@test_delete_part@c=22 +POSTHOOK: Input: default@test_delete_part@c=33 +POSTHOOK: Input: default@test_delete_part@c=__HIVE_DEFAULT_PARTITION__ +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 1 11 +13 3 11 +PREHOOK: query: DROP TABLE IF EXISTS test_update_part_text +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_update_part_text +PREHOOK: Output: default@test_update_part_text +POSTHOOK: query: DROP TABLE IF EXISTS test_update_part_text +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_update_part_text +POSTHOOK: Output: default@test_update_part_text +PREHOOK: query: DROP TABLE IF EXISTS test_update_part +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_update_part +PREHOOK: Output: default@test_update_part +POSTHOOK: query: DROP TABLE IF EXISTS test_update_part +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_update_part +POSTHOOK: Output: default@test_update_part +PREHOOK: query: DROP TABLE IF EXISTS test_delete_part +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_delete_part +PREHOOK: Output: default@test_delete_part +POSTHOOK: query: DROP TABLE IF EXISTS test_delete_part +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_delete_part +POSTHOOK: Output: default@test_delete_part diff --git ql/src/test/results/clientpositive/llap/acid_direct_update_delete_with_merge.q.out ql/src/test/results/clientpositive/llap/acid_direct_update_delete_with_merge.q.out new file mode 100644 index 0000000000..3bdaf0f092 --- /dev/null +++ ql/src/test/results/clientpositive/llap/acid_direct_update_delete_with_merge.q.out @@ -0,0 +1,157 @@ +PREHOOK: query: DROP TABLE IF EXISTS transactions +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS transactions +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS merge_source +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS merge_source +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE transactions (id int, value string, last_update_user string) PARTITIONED BY (tran_date string) CLUSTERED BY (id) into 5 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@transactions +POSTHOOK: query: CREATE TABLE transactions (id int, value string, last_update_user string) PARTITIONED BY (tran_date string) CLUSTERED BY (id) into 5 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@transactions +PREHOOK: query: CREATE TABLE merge_source (id int, value string, tran_date string) STORED AS ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_source +POSTHOOK: query: CREATE TABLE merge_source (id int, value string, tran_date string) STORED AS ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_source +PREHOOK: query: INSERT INTO transactions PARTITION (tran_date) VALUES +(1, 'value_01', 'creation', '20170410'), +(2, 'value_02', 'creation', '20170410'), +(3, 'value_03', 'creation', '20170410'), +(4, 'value_04', 'creation', '20170410'), +(5, 'value_05', 'creation', '20170413'), +(6, 'value_06', 'creation', '20170413'), +(7, 'value_07', 'creation', '20170413'), +(8, 'value_08', 'creation', '20170413'), +(9, 'value_09', 'creation', '20170413'), +(10, 'value_10','creation', '20170413') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@transactions +POSTHOOK: query: INSERT INTO transactions PARTITION (tran_date) VALUES +(1, 'value_01', 'creation', '20170410'), +(2, 'value_02', 'creation', '20170410'), +(3, 'value_03', 'creation', '20170410'), +(4, 'value_04', 'creation', '20170410'), +(5, 'value_05', 'creation', '20170413'), +(6, 'value_06', 'creation', '20170413'), +(7, 'value_07', 'creation', '20170413'), +(8, 'value_08', 'creation', '20170413'), +(9, 'value_09', 'creation', '20170413'), +(10, 'value_10','creation', '20170413') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@transactions@tran_date=20170410 +POSTHOOK: Output: default@transactions@tran_date=20170413 +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170410).id SCRIPT [] +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170410).last_update_user SCRIPT [] +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170410).value SCRIPT [] +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170413).id SCRIPT [] +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170413).last_update_user SCRIPT [] +POSTHOOK: Lineage: transactions PARTITION(tran_date=20170413).value SCRIPT [] +PREHOOK: query: INSERT INTO merge_source VALUES +(1, 'value_01', '20170410'), +(4, NULL, '20170410'), +(7, 'value_77777', '20170413'), +(8, NULL, '20170413'), +(8, 'value_08', '20170415'), +(11, 'value_11', '20170415') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_source +POSTHOOK: query: INSERT INTO merge_source VALUES +(1, 'value_01', '20170410'), +(4, NULL, '20170410'), +(7, 'value_77777', '20170413'), +(8, NULL, '20170413'), +(8, 'value_08', '20170415'), +(11, 'value_11', '20170415') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_source +POSTHOOK: Lineage: merge_source.id SCRIPT [] +POSTHOOK: Lineage: merge_source.tran_date SCRIPT [] +POSTHOOK: Lineage: merge_source.value SCRIPT [] +PREHOOK: query: MERGE INTO transactions AS T +USING merge_source AS S +ON T.ID = S.ID and T.tran_date = S.tran_date +WHEN MATCHED AND (T.value != S.value AND S.value IS NOT NULL) THEN UPDATE SET value = S.value, last_update_user = 'merge_update' +WHEN MATCHED AND S.value IS NULL THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.value, 'merge_insert', S.tran_date) +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_source +PREHOOK: Input: default@transactions +PREHOOK: Input: default@transactions@tran_date=20170410 +PREHOOK: Input: default@transactions@tran_date=20170413 +PREHOOK: Output: default@merge_tmp_table +PREHOOK: Output: default@transactions +PREHOOK: Output: default@transactions@tran_date=20170410 +PREHOOK: Output: default@transactions@tran_date=20170410 +PREHOOK: Output: default@transactions@tran_date=20170413 +PREHOOK: Output: default@transactions@tran_date=20170413 +POSTHOOK: query: MERGE INTO transactions AS T +USING merge_source AS S +ON T.ID = S.ID and T.tran_date = S.tran_date +WHEN MATCHED AND (T.value != S.value AND S.value IS NOT NULL) THEN UPDATE SET value = S.value, last_update_user = 'merge_update' +WHEN MATCHED AND S.value IS NULL THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.value, 'merge_insert', S.tran_date) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_source +POSTHOOK: Input: default@transactions +POSTHOOK: Input: default@transactions@tran_date=20170410 +POSTHOOK: Input: default@transactions@tran_date=20170413 +POSTHOOK: Output: default@merge_tmp_table +POSTHOOK: Output: default@transactions@tran_date=20170410 +POSTHOOK: Output: default@transactions@tran_date=20170410 +POSTHOOK: Output: default@transactions@tran_date=20170413 +POSTHOOK: Output: default@transactions@tran_date=20170413 +POSTHOOK: Output: default@transactions@tran_date=20170415 +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(transactions)t.FieldSchema(name:ROW__ID, type:struct, comment:), (transactions)t.FieldSchema(name:tran_date, type:string, comment:null), ] +PREHOOK: query: SELECT * FROM transactions ORDER BY ID +PREHOOK: type: QUERY +PREHOOK: Input: default@transactions +PREHOOK: Input: default@transactions@tran_date=20170410 +PREHOOK: Input: default@transactions@tran_date=20170413 +PREHOOK: Input: default@transactions@tran_date=20170415 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM transactions ORDER BY ID +POSTHOOK: type: QUERY +POSTHOOK: Input: default@transactions +POSTHOOK: Input: default@transactions@tran_date=20170410 +POSTHOOK: Input: default@transactions@tran_date=20170413 +POSTHOOK: Input: default@transactions@tran_date=20170415 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 value_01 creation 20170410 +2 value_02 creation 20170410 +3 value_03 creation 20170410 +5 value_05 creation 20170413 +6 value_06 creation 20170413 +7 value_77777 merge_update 20170413 +8 value_08 merge_insert 20170415 +9 value_09 creation 20170413 +10 value_10 creation 20170413 +11 value_11 merge_insert 20170415 +PREHOOK: query: DROP TABLE IF EXISTS transactions +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@transactions +PREHOOK: Output: default@transactions +POSTHOOK: query: DROP TABLE IF EXISTS transactions +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@transactions +POSTHOOK: Output: default@transactions +PREHOOK: query: DROP TABLE IF EXISTS merge_source +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_source +PREHOOK: Output: default@merge_source +POSTHOOK: query: DROP TABLE IF EXISTS merge_source +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_source +POSTHOOK: Output: default@merge_source diff --git ql/src/test/results/clientpositive/tez/update_orig_table.q.out ql/src/test/results/clientpositive/tez/update_orig_table.q.out index d72e8fbdca..b4ea6ee69b 100644 --- ql/src/test/results/clientpositive/tez/update_orig_table.q.out +++ ql/src/test/results/clientpositive/tez/update_orig_table.q.out @@ -32,6 +32,22 @@ POSTHOOK: type: CREATETABLE POSTHOOK: Input: hdfs://### HDFS PATH ### POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_uot +PREHOOK: query: select * from acid_uot where cint < -1070551679 +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_uot +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from acid_uot where cint < -1070551679 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_uot +POSTHOOK: Output: hdfs://### HDFS PATH ### +-51 NULL -1071480828 -1401575336 -51.0 NULL aw724t8c5558x2xneC624 4uE7l74tESBiKfu7c8wM7GA 1969-12-31 16:00:08.451 NULL true true +11 NULL -1072910839 2048385991 11.0 NULL 0iqrc5 KbaDXiN85adbHRx58v 1969-12-31 16:00:02.351 NULL false false +11 NULL -1073279343 -1595604468 11.0 NULL oj1YrV5Wa P76636jJ6qM17d7DIy 1969-12-31 16:00:02.351 NULL true true +8 NULL -1071363017 1349676361 8.0 NULL Anj0oF IwE1G7Qb0B1NEfV030g 1969-12-31 16:00:15.892 NULL true true +NULL -5470 -1072076362 1864027286 NULL -5470.0 2uLyD28144vklju213J1mr 4KWs6gw7lv2WYd66P NULL 1969-12-31 16:00:01.836 true true +NULL -7382 -1073051226 -1887561756 NULL -7382.0 A34p7oRr2WvUJNf 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:10.331 false false +NULL -741 -1070883071 -1645852809 NULL -741.0 0ruyd6Y50JpdGRf6HqD xH7445Rals48VOulSyR5F NULL 1969-12-31 15:59:51.293 false false +NULL 8373 -1072081801 1864027286 NULL 8373.0 dPkN74F7 4KWs6gw7lv2WYd66P NULL 1969-12-31 15:59:56.465 true true PREHOOK: query: update acid_uot set cstring1 = 'fred' where cint < -1070551679 PREHOOK: type: QUERY PREHOOK: Input: default@acid_uot @@ -56,3 +72,19 @@ NULL -5470 -1072076362 1864027286 NULL -5470.0 fred 4KWs6gw7lv2WYd66P NULL 1969- NULL -7382 -1073051226 -1887561756 NULL -7382.0 fred 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:10.331 false false NULL -741 -1070883071 -1645852809 NULL -741.0 fred xH7445Rals48VOulSyR5F NULL 1969-12-31 15:59:51.293 false false NULL 8373 -1072081801 1864027286 NULL 8373.0 fred 4KWs6gw7lv2WYd66P NULL 1969-12-31 15:59:56.465 true true +PREHOOK: query: select * from acid_uot where cint < -1070551679 +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_uot +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from acid_uot where cint < -1070551679 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_uot +POSTHOOK: Output: hdfs://### HDFS PATH ### +-51 NULL -1071480828 -1401575336 -51.0 NULL fred 4uE7l74tESBiKfu7c8wM7GA 1969-12-31 16:00:08.451 NULL true true +11 NULL -1072910839 2048385991 11.0 NULL fred KbaDXiN85adbHRx58v 1969-12-31 16:00:02.351 NULL false false +11 NULL -1073279343 -1595604468 11.0 NULL fred P76636jJ6qM17d7DIy 1969-12-31 16:00:02.351 NULL true true +8 NULL -1071363017 1349676361 8.0 NULL fred IwE1G7Qb0B1NEfV030g 1969-12-31 16:00:15.892 NULL true true +NULL -5470 -1072076362 1864027286 NULL -5470.0 fred 4KWs6gw7lv2WYd66P NULL 1969-12-31 16:00:01.836 true true +NULL -7382 -1073051226 -1887561756 NULL -7382.0 fred 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:10.331 false false +NULL -741 -1070883071 -1645852809 NULL -741.0 fred xH7445Rals48VOulSyR5F NULL 1969-12-31 15:59:51.293 false false +NULL 8373 -1072081801 1864027286 NULL 8373.0 fred 4KWs6gw7lv2WYd66P NULL 1969-12-31 15:59:56.465 true true