diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index b3ef916..cf90f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -387,7 +387,6 @@ DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true), - LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index feacdd8..1303b1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -69,6 +69,7 @@ public boolean accept(Path path) { }; public static final String DELTA_PREFIX = "delta_"; public static final String DELETE_DELTA_PREFIX = "delete_delta_"; + public static final String LOAD_DELTA_PREFIX = "load_delta_"; /** * Acid Streaming Ingest writes multiple transactions to the same file. It also maintains a * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of @@ -94,6 +95,12 @@ public boolean accept(Path path) { return path.getName().startsWith(DELETE_DELTA_PREFIX); } }; + public static final PathFilter loadDeltaFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(LOAD_DELTA_PREFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -196,6 +203,15 @@ public static String deleteDeltaSubdir(long min, long max, int statementId) { return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + /** + * Each LOAD DATA statement in a transaction creates its own load delta dir. + * @since 3.0.0 + */ + public static String loadDeltaSubdir(long min, long max, int statementId) { + return LOAD_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + + String.format(DELTA_DIGITS, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } @@ -342,7 +358,8 @@ public static DataOperationType toDataOperationType(Operation op) { public enum AcidBaseFileType { COMPACTED_BASE, // a regular base file generated through major compaction - ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid + ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid; + // or a non-acid schema file copied into acid table directory via "LOAD DATA" INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update } @@ -539,19 +556,22 @@ public String toString() { //had no statement ID private final int statementId; private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + private final boolean isLoadDelta; // records whether delta dir is of type 'load_delta_x_y...' /** * for pre 1.3.x delta files */ - ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) { - this(min, max, path, -1, isDeleteDelta); + ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta, boolean isLoadDelta) { + this(min, max, path, -1, isDeleteDelta, isLoadDelta); } - ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) { + ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta, + boolean isLoadDelta) { this.minTransaction = min; this.maxTransaction = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; + this.isLoadDelta = isLoadDelta; } public long getMinTransaction() { @@ -574,6 +594,10 @@ public boolean isDeleteDelta() { return isDeleteDelta; } + public boolean isLoadDelta() { + return isLoadDelta; + } + /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -709,13 +733,15 @@ public static ParsedDelta parsedDelta(Path deltaDir) { private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) { ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + boolean isLoadDelta = deltaPrefix.equals(LOAD_DELTA_PREFIX); return new ParsedDelta(p.getMinTransaction(), - p.getMaxTransaction(), path, p.statementId, isDeleteDelta); + p.getMaxTransaction(), path, p.statementId, isDeleteDelta, isLoadDelta); } public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { String filename = deltaDir.getName(); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + boolean isLoadDelta = deltaPrefix.equals(LOAD_DELTA_PREFIX); if (filename.startsWith(deltaPrefix)) { String rest = filename.substring(deltaPrefix.length()); int split = rest.indexOf('_'); @@ -725,10 +751,10 @@ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { Long.parseLong(rest.substring(split + 1)) : Long.parseLong(rest.substring(split + 1, split2)); if(split2 == -1) { - return new ParsedDelta(min, max, null, isDeleteDelta); + return new ParsedDelta(min, max, null, isDeleteDelta, isLoadDelta); } int statementId = Integer.parseInt(rest.substring(split2 + 1)); - return new ParsedDelta(min, max, null, statementId, isDeleteDelta); + return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isLoadDelta); } throw new IllegalArgumentException(deltaDir + " does not start with " + deltaPrefix); @@ -995,10 +1021,16 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else { obsolete.add(child); } - } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) + } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX) || fn.startsWith(LOAD_DELTA_PREFIX)) && child.isDir()) { - String deltaPrefix = - (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + String deltaPrefix; + if (fn.startsWith(DELTA_PREFIX)) { + deltaPrefix = DELTA_PREFIX; + } else if (fn.startsWith(DELETE_DELTA_PREFIX)) { + deltaPrefix = DELETE_DELTA_PREFIX; + } else { // fn.startsWith(LOAD_DELTA_PREFIX) + deltaPrefix = LOAD_DELTA_PREFIX; + } ParsedDelta delta = parseDelta(child, deltaPrefix); if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != @@ -1125,6 +1157,11 @@ public static void setTransactionalTableScan(Configuration conf, boolean isAcidT public static boolean isDeleteDelta(Path p) { return p.getName().startsWith(DELETE_DELTA_PREFIX); } + + public static boolean isLoadDelta(Path p) { + return p.getName().startsWith(LOAD_DELTA_PREFIX); + } + /** Checks if a table is a valid ACID table. * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 69a9f9f..b7cee30 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; import java.io.IOException; @@ -42,10 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; @@ -1140,44 +1141,59 @@ private AcidDirInfo callInternal() throws IOException { if (parsedDelta.isDeleteDelta()) { parsedDeltas.add(parsedDelta); } else { - // This is a normal insert delta, which only has insert events and hence all the files - // in this delta directory can be considered as a base. - Boolean val = useFileIds.value; - if (val == null || val) { - try { - List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); - for (HdfsFileStatusWithId fileId : insertDeltaFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); - } - if (val == null) { - useFileIds.value = true; // The call succeeded, so presumably the API is there. - } - continue; // move on to process to the next parsedDelta. - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } - // Fall back to regular API and create statuses without ID. - List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); - for (FileStatus child : children) { - HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); - } + AcidUtils.AcidBaseFileType baseFileType = parsedDelta.isLoadDelta() ? + AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.INSERT_DELTA; + PathFilter pathFilter = parsedDelta.isLoadDelta() ? AcidUtils.hiddenFileFilter : + AcidUtils.bucketFileFilter; + setBaseFiles(baseFiles, parsedDelta, pathFilter, baseFileType); } } - } else { // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); + for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) { + // if loaddelta, add it to base + if (parsedDelta.isLoadDelta()) { + setBaseFiles(baseFiles, parsedDelta, AcidUtils.hiddenFileFilter, + AcidUtils.AcidBaseFileType.ORIGINAL_BASE); + } else { + parsedDeltas.add(parsedDelta); + } + } } return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas); } + private void setBaseFiles(List baseFiles, ParsedDelta parsedDelta, + PathFilter pathFilter, AcidUtils.AcidBaseFileType baseFileType) throws IOException { + Boolean val = useFileIds.value; + if (val == null || val) { + try { + List deltaFiles = + SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), pathFilter); + for (HdfsFileStatusWithId fileId : deltaFiles) { + baseFiles.add(new AcidBaseFileInfo(fileId, baseFileType)); + } + if (val == null) { + useFileIds.value = true; // The call succeeded, so presumably the API is there. + } + return; + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + // Fall back to regular API and create statuses without ID. + List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), pathFilter); + for (FileStatus child : children) { + HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); + baseFiles.add(new AcidBaseFileInfo(fileId, baseFileType)); + } + } + private List findBaseFiles( Path base, Ref useFileIds) throws IOException { Boolean val = useFileIds.value; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index cbbb4c4..62126ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,35 +18,39 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.HdfsUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.ql.io.AcidInputFormat; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.io.AcidUtils.hiddenFileFilter; + /** * Merges a base and a list of delta files together into a single stream of * events. @@ -530,19 +534,29 @@ public void next(OrcStruct next) throws IOException { OriginalReaderPairToCompact(ReaderKey key, int bucketId, Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList) throws IOException { + ValidTxnList validTxnList, boolean isOriginal, + List loadOriginalFiles) throws IOException { super(key, bucketId, conf); assert mergerOptions.isCompacting() : "Should only be used for Compaction"; this.conf = conf; this.options = options; - assert mergerOptions.getRootPath() != null : "Since we have original files"; + this.originalFiles = new ArrayList<>(); + if (loadOriginalFiles.isEmpty()) { + assert mergerOptions.getRootPath() != null : "Since we have original files"; + } assert this.bucketId >= 0 : "don't support non-bucketed tables yet"; //when compacting each split needs to process the whole logical bucket assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; - AcidUtils.Directory directoryState = AcidUtils.getAcidState( - mergerOptions.getRootPath(), conf, validTxnList, false, true); - originalFiles = directoryState.getOriginalFiles(); + if (isOriginal) { + AcidUtils.Directory directoryState = AcidUtils.getAcidState( + mergerOptions.getRootPath(), conf, validTxnList, false, true); + originalFiles.addAll(directoryState.getOriginalFiles()); + } + if (!loadOriginalFiles.isEmpty()) { + // merge loadOriginalFiles into originalFiles + originalFiles.addAll(loadOriginalFiles); + } assert originalFiles.size() > 0; this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket if (reader == null) { @@ -888,6 +902,32 @@ boolean isDeleteReader() { // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); + + boolean hasLoadedOriginal = false; + List loadedOriginalFiles = new ArrayList<>(); + // check if there is any LOAD DELTA in delta dirs, and if so, mark isOriginal to be true + if (deltaDirectory != null && deltaDirectory.length != 0) { + for (Path delta : deltaDirectory) { + if (mergerOptions.isCompacting() && AcidUtils.isLoadDelta(delta)) { + hasLoadedOriginal = true; + if (collapseEvents) { // For LOAD DATA, we are able to proceed with MAJOR compaction + mergerOptions.isMajorCompaction(true); + } + + // add files under this LOAD DELTA as original files + FileSystem fs = delta.getFileSystem(conf); + List children = HdfsUtils.listLocatedStatus(fs, delta, hiddenFileFilter); + for (FileStatus child : children) { + loadedOriginalFiles.add(AcidUtils.createOriginalObj(null, child)); + } + + // remove this delta from deltaDirectory since we won't need to evaluate it later + deltaDirectory = (Path[]) ArrayUtils.removeElement(deltaDirectory, delta); + break; + } + } + } + if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) || mergerOptions.isDeleteReader()) { //for minor compaction, there is no progress report and we don't filter deltas @@ -913,11 +953,11 @@ boolean isDeleteReader() { // use the min/max instead of the byte range ReaderPair pair; ReaderKey key = new ReaderKey(); - if (isOriginal) { + if (isOriginal || hasLoadedOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, - conf, validTxnList); + conf, validTxnList, isOriginal, loadedOriginalFiles); } else { pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); @@ -936,7 +976,7 @@ boolean isDeleteReader() { baseReader = pair.getRecordReader(); } - if (deltaDirectory != null) { + if (deltaDirectory != null && deltaDirectory.length != 0) { /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no * user columns * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/ diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d661f10..da0261e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -62,7 +62,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -129,6 +128,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; @@ -151,7 +151,6 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; @@ -1725,7 +1724,8 @@ public Partition loadPartition(Path loadPath, Table tbl, isSrcLocal, isAutoPurge, newFiles); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, + TxnUtils.isAcidTable(tbl.getTTable()), isAcid, newFiles); } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); @@ -2094,10 +2094,10 @@ public Void call() throws Exception { * if list bucketing enabled * @param hasFollowingStatsTask * if there is any following stats task - * @param isAcid true if this is an ACID based write + * @param isAcidWriteType true if this is an ACID based write, i.e. INSERT, UPDATE or DELETE */ public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask) + boolean isSkewedStoreAsSubdir, boolean isAcidWriteType, boolean hasFollowingStatsTask) throws HiveException { List newFiles = null; @@ -2114,7 +2114,8 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean FileSystem fs; try { fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles); + copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, + TxnUtils.isAcidTable(tbl.getTTable()), isAcidWriteType, newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -3428,13 +3429,15 @@ static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, Fil * @param destf directory to move files into * @param fs Filesystem * @param isSrcLocal true if source is on local file system - * @param isAcid true if this is an ACID based write + * @param destIsAcidTable true if the destination table is an ACID table + *@param isAcidWriteType true if this is an ACID based write * @param newFiles if this is non-null, a list of files that were created as a result of this * move will be returned. * @throws HiveException */ - static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles) throws HiveException { + static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, + boolean isSrcLocal, boolean destIsAcidTable, boolean isAcidWriteType, List newFiles) + throws HiveException { try { // create the destination if it does not exist if (!fs.exists(destf)) { @@ -3463,10 +3466,26 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. - if (isAcid) { + if (isAcidWriteType) { moveAcidFiles(srcFs, srcs, destf, newFiles); } else { - copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); + if (destIsAcidTable) { // LOAD DATA is not an ACID write type, but we need to create a delta dir for it + long txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + String subDir = AcidUtils.loadDeltaSubdir(txnId, txnId, 0); + Path newDest = new Path(destf, subDir); + try { + if (!fs.exists(newDest)) { + FileUtils.mkdir(fs, newDest, conf); + } + } catch (IOException e) { + throw new HiveException( + "copyFiles: error while checking/creating destination directory!!!", + e); + } + copyFiles(conf, fs, srcs, srcFs, newDest, isSrcLocal, newFiles); + } else { + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); + } } } @@ -3505,6 +3524,8 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, fs, dst,origBucketPath, createdDeltaDirs, newFiles); moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, fs, dst, origBucketPath, createdDeltaDirs, newFiles); + moveAcidFiles(AcidUtils.LOAD_DELTA_PREFIX, AcidUtils.loadDeltaFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index fa79700..30fe24b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -227,10 +227,6 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { if (error != null) throw new SemanticException("Please load into an intermediate table" + " and use 'insert... select' to allow Hive to enforce bucketing. " + error); } - - if(AcidUtils.isAcidTable(ts.tableHandle)) { - throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName()); - } // make sure the arguments make sense List files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 04ef7fc..1c080c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -543,6 +543,13 @@ public String toString() { Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); } + } else if (dir.getName().startsWith(AcidUtils.LOAD_DELTA_PREFIX)) { + FileStatus[] files = fs.listStatus(dir, AcidUtils.hiddenFileFilter); + for(FileStatus f : files) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), true, splitToBucketMap); + } } else { // Legacy file, see if it's a bucket file Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 21b4a2c..3c8a1c2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1990,6 +1990,202 @@ public void testInsertOverwrite2() throws Exception { } /** + * Test LOAD DATA statement + * 1. Load data to an empty ACID table (load_delta_1) + * 2. Insert data to the ACID table (delta_2) + * 3. Major compaction (base_2) + * 4. Cleanup + * @throws Exception + */ + @Test + public void testLoadData1() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert rows into non-acid table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,1), (2,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 bucket files in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000" + i + "_0")); + } + // Verify query result + int [][] resultData = new int[][] {{1,1},{2,2}}; + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 2. Export data from non-acid table - this will be used to load into the ACID table later + runStatementOnDriver("export table " + Table.NONACIDORCTBL + " to '" + TEST_WAREHOUSE_DIR +"'"); + + // 3. LOAD DATA into ACID table + runStatementOnDriver("load data local inpath '" + TEST_WAREHOUSE_DIR + "/data' into table " + Table.ACIDTBL); + // The exported 2 bucket files should be copied into a load_delta dir without any modification + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().startsWith("load_delta_")); + status = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000" + i + "_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Insert new rows into ACID table + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,3), (4,4)"); + // Now there should be an additional delta dir besides the two original bucket files + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(2, status.length); + boolean sawDelta = false; + boolean sawLoadDelta = false; + for (FileStatus dir : status) { + if (dir.getPath().getName().startsWith("delta_")) { + sawDelta = true; + } else if (dir.getPath().getName().startsWith("load_delta_")) { + sawLoadDelta = true; + } + } + Assert.assertTrue(sawDelta); + Assert.assertTrue(sawLoadDelta); + // Verify query correctness + resultData = new int[][] {{1,1},{2,2},{3,3},{4,4}}; + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Perform a major compaction. The load deltas together with regular deltas will generate a + // new base dir + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + sawDelta = false; + sawLoadDelta = false; + boolean sawBase = false; + for (FileStatus dir : status) { + if (dir.getPath().getName().startsWith("delta_")) { + sawDelta = true; + } else if (dir.getPath().getName().startsWith("load_delta_")) { + sawLoadDelta = true; + } else if (dir.getPath().getName().startsWith("base_")) { + sawBase = true; + } + } + Assert.assertTrue(sawDelta); + Assert.assertTrue(sawLoadDelta); + Assert.assertTrue(sawBase); + // Verify query correctness + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 6. Run Cleaner. It should remove both regular delta dir and load delta dir + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + + /** + * Test LOAD DATA statement + * 1. Insert data to an empty ACID table (delta_1) + * 2. Load data to the ACID table (load_delta_2) + * 3. Major compaction (base_2) + * 4. Cleanup + * @throws Exception + */ + @Test + public void testLoadData2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert rows into non-acid table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,1), (2,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 bucket files in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000" + i + "_0")); + } + // Verify query result + int [][] resultData = new int[][] {{1,1},{2,2}}; + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 2. Export data from non-acid table - this will be used to load into the ACID table later + runStatementOnDriver("export table " + Table.NONACIDORCTBL + " to '" + TEST_WAREHOUSE_DIR +"'"); + + // 3. Insert some data into acid table first + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,3), (4,4)"); + // Now there should be only one delta dir + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().startsWith("delta_")); + // Verify query correctness + resultData = new int[][] {{3,3},{4,4}}; + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Now load data + runStatementOnDriver("load data local inpath '" + TEST_WAREHOUSE_DIR + "/data' into table " + Table.ACIDTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(2, status.length); + // Verify query correctness + resultData = new int[][] {{1,1},{2,2},{3,3},{4,4}}; + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Perform a major compaction. The load deltas together with regular deltas will generate a + // new base dir + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawDelta = false; + boolean sawLoadDelta = false; + boolean sawBase = false; + for (FileStatus dir : status) { + if (dir.getPath().getName().startsWith("delta_")) { + sawDelta = true; + } else if (dir.getPath().getName().startsWith("load_delta_")) { + sawLoadDelta = true; + } else if (dir.getPath().getName().startsWith("base_")) { + sawBase = true; + } + } + Assert.assertTrue(sawDelta); + Assert.assertTrue(sawLoadDelta); + Assert.assertTrue(sawBase); + // Verify query correctness + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 6. Run Cleaner. It should remove both regular delta dir and load delta dir + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ diff --git ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java index 0c1c230..84b186c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java +++ ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java @@ -83,7 +83,7 @@ public void testRenameNewFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -107,7 +107,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -127,7 +127,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -158,7 +158,7 @@ public void testCopyNewFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -185,7 +185,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -205,7 +205,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, false, NO_ACID, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); diff --git ql/src/test/queries/clientnegative/load_data_into_acid.q ql/src/test/queries/clientnegative/load_data_into_acid.q deleted file mode 100644 index fba1496..0000000 --- ql/src/test/queries/clientnegative/load_data_into_acid.q +++ /dev/null @@ -1,22 +0,0 @@ -set hive.strict.checks.bucketing=false; -set hive.support.concurrency=true; -set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; - -create table acid_ivot( - ctinyint TINYINT, - csmallint SMALLINT, - cint INT, - cbigint BIGINT, - cfloat FLOAT, - cdouble DOUBLE, - cstring1 STRING, - cstring2 STRING, - ctimestamp1 TIMESTAMP, - ctimestamp2 TIMESTAMP, - cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true'); - -LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; - - diff --git ql/src/test/queries/clientpositive/acid_load_data1.q ql/src/test/queries/clientpositive/acid_load_data1.q new file mode 100644 index 0000000..41d9695 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_load_data1.q @@ -0,0 +1,47 @@ +set hive.strict.checks.bucketing=false; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +drop table nonacidtbl1; +drop table nonacidtbl2; +drop table acidtbl; + +create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc; +insert into table nonacidtbl1 values (1, 1), (2, 2); +select * from nonacidtbl1; + +create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc; +insert into table nonacidtbl2 values (3, 3), (4, 4); +select * from nonacidtbl2; + +set hive.test.mode=true; +set hive.test.mode.prefix=; +set hive.test.mode.nosamplelist=nonacidtbl1,nonacidtbl2,acidtbl; + +dfs ${system:test.dfs.mkdir} target/tmp/ql/test/data/exports/nonacidtbl1/temp; +dfs -rmr target/tmp/ql/test/data/exports/nonacidtbl1; +export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1'; + +dfs ${system:test.dfs.mkdir} target/tmp/ql/test/data/exports/nonacidtbl2/temp; +dfs -rmr target/tmp/ql/test/data/exports/nonacidtbl2; +export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2'; + +create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true'); +load data local inpath "target/tmp/ql/test/data/exports/nonacidtbl1/data" into table acidtbl; +select * from acidtbl a1; +load data local inpath "target/tmp/ql/test/data/exports/nonacidtbl2/data" overwrite into table acidtbl; +select * from acidtbl a1; + +set hive.test.mode=false; + +insert into table acidtbl values (5, 5); +select * from acidtbl a2; + +insert into table acidtbl values (6, 6); +select * from acidtbl a3; + +drop table nonacidtbl1; +drop table nonacidtbl2; +drop table acidtbl; + diff --git ql/src/test/queries/clientpositive/acid_load_data2.q ql/src/test/queries/clientpositive/acid_load_data2.q new file mode 100644 index 0000000..2b9adc8 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_load_data2.q @@ -0,0 +1,55 @@ +set hive.strict.checks.bucketing=false; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +drop table nonacidtbl1; +drop table nonacidtbl2; +drop table acidtbl; + +create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc; +insert into table nonacidtbl1 values (1, 1), (2, 2); +select * from nonacidtbl1; + +create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc; +insert into table nonacidtbl2 values (3, 3), (4, 4); +select * from nonacidtbl2; + +set hive.test.mode=true; +set hive.test.mode.prefix=; +set hive.test.mode.nosamplelist=nonacidtbl1,nonacidtbl2,acidtbl; + +dfs ${system:test.dfs.mkdir} target/tmp/ql/test/data/exports/nonacidtbl1/temp; +dfs -rmr target/tmp/ql/test/data/exports/nonacidtbl1; +export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1'; + +dfs ${system:test.dfs.mkdir} target/tmp/ql/test/data/exports/nonacidtbl2/temp; +dfs -rmr target/tmp/ql/test/data/exports/nonacidtbl2; +export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2'; + +set hive.test.mode=false; + +create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true'); + +insert into table acidtbl values (5, 5); +select * from acidtbl a1; + +load data local inpath "target/tmp/ql/test/data/exports/nonacidtbl1/data" into table acidtbl; +select * from acidtbl a2; + +insert into table acidtbl values (6, 6); +select * from acidtbl a3; + +load data local inpath "target/tmp/ql/test/data/exports/nonacidtbl2/data" overwrite into table acidtbl; +select * from acidtbl a4; + +insert into table acidtbl values (7, 7); +select * from acidtbl a5; + +insert into table acidtbl values (8, 8); +select * from acidtbl a6; + +drop table nonacidtbl1; +drop table nonacidtbl2; +drop table acidtbl; + diff --git ql/src/test/results/clientnegative/load_data_into_acid.q.out ql/src/test/results/clientnegative/load_data_into_acid.q.out deleted file mode 100644 index cd829ba..0000000 --- ql/src/test/results/clientnegative/load_data_into_acid.q.out +++ /dev/null @@ -1,33 +0,0 @@ -PREHOOK: query: create table acid_ivot( - ctinyint TINYINT, - csmallint SMALLINT, - cint INT, - cbigint BIGINT, - cfloat FLOAT, - cdouble DOUBLE, - cstring1 STRING, - cstring2 STRING, - ctimestamp1 TIMESTAMP, - ctimestamp2 TIMESTAMP, - cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@acid_ivot -POSTHOOK: query: create table acid_ivot( - ctinyint TINYINT, - csmallint SMALLINT, - cint INT, - cbigint BIGINT, - cfloat FLOAT, - cdouble DOUBLE, - cstring1 STRING, - cstring2 STRING, - ctimestamp1 TIMESTAMP, - ctimestamp2 TIMESTAMP, - cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@acid_ivot -FAILED: SemanticException [Error 10266]: LOAD DATA... statement is not supported on transactional table default@acid_ivot. diff --git ql/src/test/results/clientpositive/acid_load_data1.q.out ql/src/test/results/clientpositive/acid_load_data1.q.out new file mode 100644 index 0000000..6df91ef --- /dev/null +++ ql/src/test/results/clientpositive/acid_load_data1.q.out @@ -0,0 +1,190 @@ +PREHOOK: query: drop table nonacidtbl1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table nonacidtbl1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table nonacidtbl2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table nonacidtbl2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table acidtbl +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table acidtbl +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@nonacidtbl1 +PREHOOK: query: insert into table nonacidtbl1 values (1, 1), (2, 2) +PREHOOK: type: QUERY +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: insert into table nonacidtbl1 values (1, 1), (2, 2) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@nonacidtbl1 +POSTHOOK: Lineage: nonacidtbl1.c1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: nonacidtbl1.c2 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from nonacidtbl1 +PREHOOK: type: QUERY +PREHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +POSTHOOK: query: select * from nonacidtbl1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +2 2 +1 1 +PREHOOK: query: create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@nonacidtbl2 +PREHOOK: query: insert into table nonacidtbl2 values (3, 3), (4, 4) +PREHOOK: type: QUERY +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: insert into table nonacidtbl2 values (3, 3), (4, 4) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@nonacidtbl2 +POSTHOOK: Lineage: nonacidtbl2.c1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: nonacidtbl2.c2 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from nonacidtbl2 +PREHOOK: type: QUERY +PREHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +POSTHOOK: query: select * from nonacidtbl2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +4 4 +3 3 +#### A masked pattern was here #### +PREHOOK: query: export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1' +PREHOOK: type: EXPORT +PREHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +POSTHOOK: query: export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +PREHOOK: query: export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2' +PREHOOK: type: EXPORT +PREHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +POSTHOOK: query: export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +PREHOOK: query: create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@acidtbl +POSTHOOK: query: create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acidtbl +#### A masked pattern was here #### +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@acidtbl +PREHOOK: query: select * from acidtbl a1 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +2 2 +1 1 +#### A masked pattern was here #### +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@acidtbl +PREHOOK: query: select * from acidtbl a1 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +PREHOOK: query: insert into table acidtbl values (5, 5) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (5, 5) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a2 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +5 5 +PREHOOK: query: insert into table acidtbl values (6, 6) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (6, 6) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a3 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +5 5 +6 6 +PREHOOK: query: drop table nonacidtbl1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@nonacidtbl1 +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: drop table nonacidtbl1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@nonacidtbl1 +POSTHOOK: Output: default@nonacidtbl1 +PREHOOK: query: drop table nonacidtbl2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@nonacidtbl2 +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: drop table nonacidtbl2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@nonacidtbl2 +POSTHOOK: Output: default@nonacidtbl2 +PREHOOK: query: drop table acidtbl +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@acidtbl +PREHOOK: Output: default@acidtbl +POSTHOOK: query: drop table acidtbl +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@acidtbl +POSTHOOK: Output: default@acidtbl diff --git ql/src/test/results/clientpositive/acid_load_data2.q.out ql/src/test/results/clientpositive/acid_load_data2.q.out new file mode 100644 index 0000000..ccc0412 --- /dev/null +++ ql/src/test/results/clientpositive/acid_load_data2.q.out @@ -0,0 +1,228 @@ +PREHOOK: query: drop table nonacidtbl1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table nonacidtbl1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table nonacidtbl2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table nonacidtbl2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table acidtbl +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table acidtbl +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: create table nonacidtbl1 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@nonacidtbl1 +PREHOOK: query: insert into table nonacidtbl1 values (1, 1), (2, 2) +PREHOOK: type: QUERY +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: insert into table nonacidtbl1 values (1, 1), (2, 2) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@nonacidtbl1 +POSTHOOK: Lineage: nonacidtbl1.c1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: nonacidtbl1.c2 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from nonacidtbl1 +PREHOOK: type: QUERY +PREHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +POSTHOOK: query: select * from nonacidtbl1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +2 2 +1 1 +PREHOOK: query: create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: create table nonacidtbl2 (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@nonacidtbl2 +PREHOOK: query: insert into table nonacidtbl2 values (3, 3), (4, 4) +PREHOOK: type: QUERY +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: insert into table nonacidtbl2 values (3, 3), (4, 4) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@nonacidtbl2 +POSTHOOK: Lineage: nonacidtbl2.c1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: nonacidtbl2.c2 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from nonacidtbl2 +PREHOOK: type: QUERY +PREHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +POSTHOOK: query: select * from nonacidtbl2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +4 4 +3 3 +#### A masked pattern was here #### +PREHOOK: query: export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1' +PREHOOK: type: EXPORT +PREHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +POSTHOOK: query: export table nonacidtbl1 to 'ql/test/data/exports/nonacidtbl1' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@nonacidtbl1 +#### A masked pattern was here #### +PREHOOK: query: export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2' +PREHOOK: type: EXPORT +PREHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +POSTHOOK: query: export table nonacidtbl2 to 'ql/test/data/exports/nonacidtbl2' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@nonacidtbl2 +#### A masked pattern was here #### +PREHOOK: query: create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@acidtbl +POSTHOOK: query: create table acidtbl (c1 int, c2 int) clustered by (c2) into 2 buckets stored as orc +tblproperties('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acidtbl +PREHOOK: query: insert into table acidtbl values (5, 5) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (5, 5) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a1 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +5 5 +#### A masked pattern was here #### +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@acidtbl +PREHOOK: query: select * from acidtbl a2 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +5 5 +2 2 +1 1 +PREHOOK: query: insert into table acidtbl values (6, 6) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (6, 6) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a3 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +5 5 +6 6 +2 2 +1 1 +#### A masked pattern was here #### +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@acidtbl +PREHOOK: query: select * from acidtbl a4 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a4 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +PREHOOK: query: insert into table acidtbl values (7, 7) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (7, 7) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__5)values__tmp__table__5.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__5)values__tmp__table__5.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a5 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +7 7 +PREHOOK: query: insert into table acidtbl values (8, 8) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values (8, 8) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.c1 EXPRESSION [(values__tmp__table__6)values__tmp__table__6.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.c2 EXPRESSION [(values__tmp__table__6)values__tmp__table__6.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl a6 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl a6 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +4 4 +3 3 +7 7 +8 8 +PREHOOK: query: drop table nonacidtbl1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@nonacidtbl1 +PREHOOK: Output: default@nonacidtbl1 +POSTHOOK: query: drop table nonacidtbl1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@nonacidtbl1 +POSTHOOK: Output: default@nonacidtbl1 +PREHOOK: query: drop table nonacidtbl2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@nonacidtbl2 +PREHOOK: Output: default@nonacidtbl2 +POSTHOOK: query: drop table nonacidtbl2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@nonacidtbl2 +POSTHOOK: Output: default@nonacidtbl2 +PREHOOK: query: drop table acidtbl +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@acidtbl +PREHOOK: Output: default@acidtbl +POSTHOOK: query: drop table acidtbl +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@acidtbl +POSTHOOK: Output: default@acidtbl