diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bd25bc7cad..c214d4090e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1890,7 +1890,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" + "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" + "This is intended to be used as an internal property for future versions of ACID. (See\n" + - "HIVE-14035 for details.)"), + "HIVE-14035 for details. User sets it tblproperites via transactional_properties.)", true), HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 186d5809c8..419e74c2d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -376,7 +376,6 @@ DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true), - LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 0a34633fa4..033fcc5cd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4405,7 +4405,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I part.getTPartition().getParameters().putAll(alterTbl.getProps()); } else { boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters()); - Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps()); + Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { result = generateAddMmTasks(tbl); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e2f8c1f801..22e665687e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -215,7 +215,7 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { Context ctx = driverContext.getCtx(); if(ctx.getHiveTxnManager().supportsAcid()) { - //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit + //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit return; } HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager(); @@ -290,7 +290,7 @@ public int execute(DriverContext driverContext) { } else { Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath); if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { - //'targetPath' is table root of un-partitioned table/partition + //'targetPath' is table root of un-partitioned table or partition //'sourcePath' result of 'select ...' part of CTAS statement assert lfd.getIsDfsDir(); FileSystem srcFs = sourcePath.getFileSystem(conf); @@ -367,7 +367,7 @@ public int execute(DriverContext driverContext) { checkFileFormats(db, tbd, table); boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID - && !tbd.isMmTable(); + && !tbd.isMmTable();//it seems that LoadTableDesc has Operation.INSERT only for CTAS... // Create a data container DataContainer dc = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index bb1f4e5050..545b7a8b7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -229,6 +229,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + //todo: what is the point of this? If this is for replication, who would have opened a txn? SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c0b71f65f..df2de94541 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -22,7 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -30,6 +30,8 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.Ref; import org.apache.orc.impl.OrcAcidUtils; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -244,7 +245,7 @@ public static Path createFilename(Path directory, * @param path the base directory name * @return the maximum transaction id that is included */ - static long parseBase(Path path) { + public static long parseBase(Path path) { String filename = path.getName(); if (filename.startsWith(BASE_PREFIX)) { return Long.parseLong(filename.substring(BASE_PREFIX.length())); @@ -273,7 +274,7 @@ static long parseBase(Path path) { .minimumTransactionId(0) .maximumTransactionId(0) .bucket(bucket) - .writingBase(true); + .writingBase(true);//todo: this now depends on whether parent is base or delta } else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { //todo: define groups in regex and use parseInt(Matcher.group(2)).... @@ -321,6 +322,11 @@ else if (filename.startsWith(BUCKET_PREFIX)) { } public enum Operation implements Serializable { + /** + todo: should this have LOAD_DATA, IOW etc? Does that duplicate WriteType? Yes, but there are slightly + * different semantics for all these + * What about HiveOperationType? + */ NOT_ACID, INSERT, UPDATE, DELETE; } @@ -344,11 +350,18 @@ public static DataOperationType toDataOperationType(Operation op) { throw new IllegalArgumentException("Unexpected Operation: " + op); } } - public enum AcidBaseFileType { - COMPACTED_BASE, // a regular base file generated through major compaction - ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid - INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update + /** + * File w/o Acid meta columns. This is specifically for files that were added to a table before + * it was converted to Acid but not yet major compacted. + * Hmm - we don't (since unbucketed tables) care about location - it can be in subdir + * So then why do we care about ORIGINAL_BASE vs RAW_SCHEMA diestinction? + */ + ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid + /** + * file that has Acid metadata columns embedded in it. Found in base_x/ or delta_x_y/. + */ + ACID_SCHEMA, // a delta file with only insert events that can be treated as base for split-update } /** @@ -366,16 +379,12 @@ public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFi this.acidBaseFileType = acidBaseFileType; } - public boolean isCompactedBase() { - return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE; - } - public boolean isOriginal() { return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE; } public boolean isInsertDelta() { - return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA; + return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA; } public HdfsFileStatusWithId getHdfsFileStatusWithId() { @@ -545,6 +554,7 @@ public String toString() { * @return the base directory to read */ Path getBaseDirectory(); + boolean isBaseInRawFormat(); /** * Get the list of original files. Not {@code null}. Must be sorted. @@ -577,13 +587,16 @@ public String toString() { } public static class ParsedDelta implements Comparable { + //todo: make sure this has "format" info private final long minTransaction; private final long maxTransaction; private final FileStatus path; //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID private final int statementId; + //todo: maybe "type" should be made more explicit: base, delete, raw.... or insert_acid, insert_raw, delete private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + private boolean isRawFormat = false; /** * for pre 1.3.x delta files @@ -597,6 +610,8 @@ public String toString() { this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; + //this.isRawFormat = isRawFormat; + assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; } public long getMinTransaction() { @@ -618,6 +633,13 @@ public int getStatementId() { public boolean isDeleteDelta() { return isDeleteDelta; } + //todo: this breaks Immutability of ParsedDelta + void setIsRawFormat(boolean isRawFormat) { + this.isRawFormat = isRawFormat; + } + public boolean isRawFormat() { + return isRawFormat; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files @@ -698,29 +720,6 @@ else if(statementId != parsedDelta.statementId) { } /** - * Convert the list of begin/end transaction id pairs to a list of delta - * directories. Note that there may be multiple delta files for the exact same txn range starting - * with 1.3.x; - * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} - * @param root the root directory - * @param deltas list of begin/end transaction id pairs - * @return the list of delta paths - */ - public static Path[] deserializeDeltas(Path root, final List deltas) throws IOException { - List results = new ArrayList(deltas.size()); - for(AcidInputFormat.DeltaMetaData dmd : deltas) { - if(dmd.getStmtIds().isEmpty()) { - results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); - continue; - } - for(Integer stmtId : dmd.getStmtIds()) { - results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); - } - } - return results.toArray(new Path[results.size()]); - } - - /** * Convert the list of begin/end transaction id pairs to a list of delete delta * directories. Note that there may be multiple delete_delta files for the exact same txn range starting * with 2.2.x; @@ -871,13 +870,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); + obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { getChildState(child, null, txnList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } @@ -976,12 +975,18 @@ else if (prev != null && next.maxTransaction == prev.maxTransaction //this does "Path.uri.compareTo(that.uri)" return o1.getFileStatus().compareTo(o2.getFileStatus()); }); - return new Directory(){ + + final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); + return new Directory() { @Override public Path getBaseDirectory() { return base; } + @Override + public boolean isBaseInRawFormat() { + return isBaseInRawFormat; + } @Override public List getOriginalFiles() { @@ -1022,7 +1027,7 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles, List aborted, Map tblproperties) throws IOException { + boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -1051,6 +1056,9 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String deltaPrefix = (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix); + if(!delta.isDeleteDelta()) { + delta.setIsRawFormat(MetaDataFile.isRawFormat(p, fs)); + } if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) && ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) { aborted.add(child); @@ -1171,8 +1179,11 @@ public static void setTransactionalTableScan(Map parameters, boo parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable)); } - public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) { - HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); + /** + * Means it's a full acid table + */ + public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) { + HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable); } /** * @param p - not null @@ -1185,6 +1196,10 @@ public static boolean isDeleteDelta(Path p) { * SessionState.get().getTxnMgr().supportsAcid() here * @param table table * @return true if table is a legit ACID table, false otherwise + * ToDo: this may be an issue: this is TRUE for both full acid and MM but a lot of places in the + * code treat isAcid to mean full acid scan. This will cause confusion. + * + * todo: should this be called transactional? */ public static boolean isAcidTable(Table table) { if (table == null) { @@ -1209,7 +1224,7 @@ public static boolean isAcidTable(CreateTableDesc table) { } public static boolean isFullAcidTable(Table table) { - return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters()); + return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table); } /** @@ -1336,6 +1351,9 @@ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOExc public static boolean isInsertOnlyTable(Map params) { return isInsertOnlyTable(params, false); } + public static boolean isInsertOnlyTable(Table table) { + return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly(); + } // TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used? public static boolean isInsertOnlyTable(Map params, boolean isCtas) { @@ -1349,13 +1367,17 @@ public static boolean isInsertOnlyTable(Properties params) { return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp)); } - /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */ - public static Boolean isToInsertOnlyTable(Map props) { + /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. + * todo: All such validation logic should be TransactionValidationListener*/ + public static Boolean isToInsertOnlyTable(Table tbl, Map props) { // Note: Setting these separately is a very hairy issue in certain combinations, since we // cannot decide what type of table this becomes without taking both into account, and // in many cases the conversion might be illegal. // The only thing we allow is tx = true w/o tx-props, for backward compat. String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if(transactional == null) { + transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + } String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); if (transactional == null && transactionalProp == null) return null; // Not affected. boolean isSetToTxn = "true".equalsIgnoreCase(transactional); @@ -1378,4 +1400,69 @@ public static boolean isRemovedInsertOnlyTable(Set removedSet) { hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return hasTxn || hasProps; } + public static class MetaDataFile { + private static final String METADATA_FILE = "_meta_data"; + private static final String CURRENT_VERSION = "0"; + //todo: enums? that have both field name and value list + private interface Field { + String VERSION = "thisFileVersion"; + String DATA_FORMAT = "dataFormat"; + } + private interface Value { + //plain ORC file + String RAW = "raw"; + //result of acid write, i.e. decorated with ROW__ID info + String NATIVE = "native"; + } + + /** + * @param baseOrDeltaDir detla or base dir, must exist + */ + public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat) + throws IOException { + /** + * create _meta_data json file in baseOrDeltaDir + * write thisFileVersion, dataFormat + * + * on read if the file is not there, assume version 0 and dataFormat=acid + */ + Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); + Map metaData = new HashMap<>(); + metaData.put(Field.VERSION, CURRENT_VERSION); + metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE); + try (FSDataOutputStream strm = fs.create(formatFile, false)) { + new ObjectMapper().writeValue(strm, metaData); + } catch (IOException ioe) { + String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE + ": " + ioe.getMessage(); + LOG.error(msg, ioe); + throw ioe; + } + } + public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); + if(!fs.exists(formatFile)) { + return false; + } + try (FSDataInputStream strm = fs.open(formatFile)) { + Map metaData = new ObjectMapper().readValue(strm, Map.class); + if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) { + throw new IllegalStateException("Unexpected Meta Data version: " + metaData.get(Field.VERSION)); + } + String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null"); + switch (dataFormat) { + case Value.NATIVE: + return false; + case Value.RAW: + return true; + default: + throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT + ": " + dataFormat); + } + } + catch(IOException e) { + String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE + ": " + e.getMessage(); + LOG.error(msg, e); + throw e; + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6a1dc729f3..819c2a2667 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -468,6 +468,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job try { Utilities.copyTablePropertiesToConf(table, conf); + if(tableScan != null) { + AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable()); + } } catch (HiveException e) { throw new IOException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 1e5b841f4b..5a21a46ce7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; import java.io.IOException; @@ -406,7 +407,7 @@ public static boolean isOriginal(Footer footer) { * @param readerSchema the types for the reader * @param conf the configuration */ - public static boolean[] genIncludedColumns(TypeDescription readerSchema, + static boolean[] genIncludedColumns(TypeDescription readerSchema, Configuration conf) { if (!ColumnProjectionUtils.isReadAllColumns(conf)) { List included = ColumnProjectionUtils.getReadColumnIDs(conf); @@ -416,7 +417,7 @@ public static boolean isOriginal(Footer footer) { } } - public static String[] getSargColumnNames(String[] originalColumnNames, + private static String[] getSargColumnNames(String[] originalColumnNames, List types, boolean[] includedColumns, boolean isOriginal) { int rootColumn = getRootColumn(isOriginal); String[] columnNames = new String[types.size() - rootColumn]; @@ -692,21 +693,21 @@ public static void clearLocalCache() { */ @VisibleForTesting static final class AcidDirInfo { - public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, + AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, List baseFiles, - List parsedDeltas) { + List deleteEvents) { this.splitPath = splitPath; this.acidInfo = acidInfo; this.baseFiles = baseFiles; this.fs = fs; - this.parsedDeltas = parsedDeltas; + this.deleteEvents = deleteEvents; } final FileSystem fs; final Path splitPath; final AcidUtils.Directory acidInfo; final List baseFiles; - final List parsedDeltas; + final List deleteEvents; } @VisibleForTesting @@ -881,7 +882,7 @@ public String toString() { public CombineResult combineWith(FileSystem fs, Path dir, List otherFiles, boolean isOriginal) { if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT - || this.isOriginal != isOriginal) { + || this.isOriginal != isOriginal) {//todo: what is this checking???? return (files.size() > otherFiles.size()) ? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE; } @@ -1080,6 +1081,12 @@ public String toString() { static final class FileGenerator implements Callable { private final Context context; private final FileSystem fs; + /** + * For plain or acid tables this is the root of the partition (or table if not partitioned). + * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that + * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already + * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}. + */ private final Path dir; private final Ref useFileIds; private final UserGroupInformation ugi; @@ -1116,25 +1123,28 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { + //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, context.transactionList, useFileIds, true, null); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) - List baseFiles = new ArrayList(); + List baseFiles = new ArrayList<>(); if (base == null) { + //for non-acid tables, all data files are in getOriginalFiles() list for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } } else { List compactedBaseFiles = findBaseFiles(base, useFileIds); for (HdfsFileStatusWithId fileId : compactedBaseFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE)); + baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ? + AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA)); } } // Find the parsed deltas- some of them containing only the insert delta events // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) - List parsedDeltas = new ArrayList(); + List parsedDeltas = new ArrayList<>(); if (context.acidOperationalProperties != null && context.acidOperationalProperties.isSplitUpdate()) { @@ -1151,15 +1161,26 @@ private AcidDirInfo callInternal() throws IOException { if (parsedDelta.isDeleteDelta()) { parsedDeltas.add(parsedDelta); } else { + AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ? + AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA; + PathFilter bucketFilter = parsedDelta.isRawFormat() ? + AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter; + if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() != + parsedDelta.getMaxTransaction()) { + //delta/ with files in raw format are a result of Load Data (as opposed to compaction + //or streaming ingest so must have interval length == 1. + throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE + + " format but txnIds are out of range: " + parsedDelta.getPath()); + } // This is a normal insert delta, which only has insert events and hence all the files // in this delta directory can be considered as a base. Boolean val = useFileIds.value; if (val == null || val) { try { List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); + SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter); for (HdfsFileStatusWithId fileId : insertDeltaFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); } if (val == null) { useFileIds.value = true; // The call succeeded, so presumably the API is there. @@ -1173,15 +1194,20 @@ private AcidDirInfo callInternal() throws IOException { } } // Fall back to regular API and create statuses without ID. - List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); + List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter); for (FileStatus child : children) { HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); } } } } else { + /* + We already handled all delete deltas above and there should not be any other deltas for + any table type. (this was acid 1.0 code path). + */ + assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir; // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); @@ -1655,7 +1681,7 @@ private long computeProjectionSize(List fileTypes, pathFutures.add(ecs.submit(fileGenerator)); } - boolean isTransactionalTableScan =//this never seems to be set correctly + boolean isTransactionalTableScan = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); TypeDescription readerSchema = @@ -1703,7 +1729,7 @@ private long computeProjectionSize(List fileTypes, // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -1952,6 +1978,7 @@ public float getProgress() throws IOException { final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); mergerOptions.rootPath(split.getRootDir()); + mergerOptions.bucketPath(split.getPath()); final int bucket; if (split.hasBase()) { AcidOutputFormat.Options acidIOOptions = @@ -1965,8 +1992,9 @@ public float getProgress() throws IOException { } } else { bucket = (int) split.getStart(); + assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath(); } - + //todo: createOptionsForReader() assumes it's !isOriginal.... why? final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); @@ -2038,6 +2066,7 @@ public float getProgress() throws IOException { // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. final List schemaTypes = OrcUtils.getOrcTypes(schema); readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf)); + //todo: last param is bogus. why is this hardcoded? OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true); return readerOptions; } @@ -2128,6 +2157,11 @@ private static boolean isStripeSatisfyPredicate( return sarg.evaluate(truthValues).isNeeded(); } + /** + * do we still need this? what we really need is pass info into OrcSplit about the type of delta/base it is + * did we need this becase "original" files didn't vectorize? + * For MM table we can have no base + (insert) deltas. May also have base + no deltas. + */ @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, @@ -2143,10 +2177,8 @@ private static boolean isStripeSatisfyPredicate( boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem) && HdfsUtils.isDefaultFs((DistributedFileSystem) fs)); - // When no baseFiles, we will just generate a single split strategy and return. - List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(), false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2154,12 +2186,14 @@ private static boolean isStripeSatisfyPredicate( return splitStrategies; // return here } + List acidSchemaFiles = new ArrayList<>(); List originalSchemaFiles = new ArrayList(); // Separate the base files into acid schema and non-acid(original) schema files. for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) { if (acidBaseFileInfo.isOriginal()) { originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); } else { + assert acidBaseFileInfo.isInsertDelta(); acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); } } @@ -2253,19 +2287,23 @@ private static boolean isStripeSatisfyPredicate( if (baseDirectory != null) {//this is NULL for minor compaction Path bucketFile = null; if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { - bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); + bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);//todo: may not be the right file name for load data base_x + isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf)); } else { /**we don't know which file to start reading - * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/ isOriginal = true; } - if(bucketFile != null) { + if(bucketFile != null) {//todo: why the hell do we need this reader? reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); } } OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options() .isCompacting(true) .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null); + //todo: the isOriginal here is bogus. We need the compactor to make the determination for + //each ReaderPair that it creates based on the dir + //for compaction this really just needs directory and bucketId. It should be able to figure if it's original return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95a60dc032..0ea8796a84 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -320,12 +320,21 @@ public void next(OrcStruct next) throws IOException { private final ReaderKey key; final int bucketId; final int bucketProperty; + private final long transactionId; - OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException { + OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions) throws IOException { this.key = key; this.bucketId = bucketId; assert bucketId >= 0 : "don't support non-bucketed tables yet"; this.bucketProperty = encodeBucketId(conf, bucketId); + //for normal read we should have isOriginal from OrcSplit so we know ... no we don't. We just know that it's original schema but we don't know the txnid to use + //walk up until you find base/delta or partition root - in the last case it's 0 otherwise parse it + //for compaction, isOriginal is useles so we need to find delta and parse _meta_data file + //we are calling getAcidState for Original reader anyway - get txn there? + //oops - right now it always reads getOriginalFiles() but now we wan it to read base/ or delta/ + //also, for unbucketed tables should we allow loading a dir with copy_N files? + //in general what checks do we perform on input data? + transactionId = mergeOptions.getTransactionId(); } @Override public final OrcStruct nextRecord() { return nextRecord; @@ -355,9 +364,9 @@ final boolean nextFromCurrentFile(OrcStruct next) throws IOException { new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucketProperty)); nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID, @@ -369,17 +378,17 @@ final boolean nextFromCurrentFile(OrcStruct next) throws IOException { ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) - .set(0); + .set(transactionId); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucketProperty); ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) - .set(0); + .set(transactionId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); nextRecord().setFieldValue(OrcRecordUpdater.ROW, getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucketProperty, nextRowId, 0L, 0); + key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0); if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + getMaxKey()); @@ -401,12 +410,11 @@ static int encodeBucketId(Configuration conf, int bucketId) { private final RecordReader recordReader; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId, final RecordIdentifier minKey, final RecordIdentifier maxKey, Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + super(key, bucketId, conf, mergerOptions); this.reader = reader; assert !mergerOptions.isCompacting(); assert mergerOptions.getRootPath() != null : "Since we have original files"; @@ -429,6 +437,9 @@ static int encodeBucketId(Configuration conf, int bucketId) { //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); + //todo: to handle copy_N files in base or delta from load data this needs to iterate over list of files in that dir + //for starters assume no copy_N files and all named properly + //otherwise this technically works for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); @@ -533,7 +544,7 @@ public void next(OrcStruct next) throws IOException { OriginalReaderPairToCompact(ReaderKey key, int bucketId, Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + super(key, bucketId, conf, mergerOptions); assert mergerOptions.isCompacting() : "Should only be used for Compaction"; this.conf = conf; this.options = options; @@ -755,13 +766,20 @@ private KeyInterval discoverKeyBounds(Reader reader, * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts. * This makes the "context" explicit. */ - static class Options { + static class Options implements Cloneable { private int copyIndex = 0; private boolean isCompacting = false; private Path bucketPath; private Path rootPath; private boolean isMajorCompaction = false; private boolean isDeleteReader = false; + /** + * for reading "original" files - i.e. not native acid schema. Default value of 0 is + * appropriate for files that existed in a table before it was made transactional. 0 is the + * primordial transaction. For non-native files resulting from Load Data command, they + * are located and base_x or delta_x_x and then transactionId == x. + */ + private long transactionId = 0; Options copyIndex(int copyIndex) { assert copyIndex >= 0; this.copyIndex = copyIndex; @@ -790,6 +808,10 @@ Options isDeleteReader(boolean isDeleteReader) { assert !isCompacting; return this; } + Options transactionId(long transactionId) { + this.transactionId = transactionId; + return this; + } /** * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix */ @@ -825,6 +847,20 @@ boolean isMinorCompaction() { boolean isDeleteReader() { return isDeleteReader; } + long getTransactionId() { + return transactionId; + } + /** + * shallow clone + */ + public Options clone() { + try { + return (Options) super.clone(); + } + catch(CloneNotSupportedException ex) { + throw new AssertionError(); + } + } } /** * Create a reader that merge sorts the ACID events together. @@ -835,6 +871,7 @@ boolean isDeleteReader() { * @param options the options to read with * @param deltaDirectory the list of delta directories to include * @throws IOException + * TODO: split the ctor into read and compact - this is too complicated */ OrcRawRecordMerger(Configuration conf, boolean collapseEvents, @@ -844,6 +881,11 @@ boolean isDeleteReader() { ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory, Options mergerOptions) throws IOException { + /** + * Since we have mergeOptions.getBucketPath() we have delta/base dir and we can read the _meta_data file + * it's a bit ugly - we should refactor this + */ + this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); @@ -913,16 +955,49 @@ boolean isDeleteReader() { } LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range - ReaderPair pair; + ReaderPair pair = null; ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { + assert mergerOptions.isMajorCompaction(); + //todo: this is probably wrong pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, conf, validTxnList); } else { - pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), - keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); + assert mergerOptions.getBucketPath() != null : " since this is not compaction"; + //if here it's a non-acid schema file - check if from before table was marked transactional + //or in base_x/delta_x_x from Load Data + Path parent = mergerOptions.getBucketPath().getParent(); + while(parent != null && !parent.equals(mergerOptions.getRootPath())) { + //https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Loadingfilesintotables says input dir cannot + //have subdirs (is this enforced?) - todo: then this doesn't have to walk + boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); + if(isBase || isDelta) { + Options originalReadOptions = mergerOptions.clone(); + if(isBase) { + originalReadOptions.transactionId(AcidUtils.parseBase(parent)); + } + else { + AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX); + assert pd.getMinTransaction() == pd.getMaxTransaction() : + "This a delta with raw non acid schema, must be result of single write, no compaction: " + + mergerOptions.getBucketPath(); + originalReadOptions.transactionId(pd.getMinTransaction()); + } + originalReadOptions.rootPath(parent); + pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), + keyInterval.getMaxKey(), options, originalReadOptions, conf, validTxnList); + break; + } + parent = parent.getParent(); + } + if(pair == null) { + //we didn't find base_x above so the file must be pre conversion file + pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), + keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); + } } } else { pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), @@ -938,9 +1013,9 @@ boolean isDeleteReader() { baseReader = pair.getRecordReader(); } - if (deltaDirectory != null) { - /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no - * user columns + if (deltaDirectory != null && deltaDirectory.length > 0) { + /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no + * user columns. For Compaction there is never a SARG. * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/ Reader.Options deltaEventOptions = eventOptions.clone() .searchArgument(null, null).range(0, Long.MAX_VALUE); @@ -961,8 +1036,17 @@ boolean isDeleteReader() { long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile)); assert length >= 0; Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); - ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, - deltaEventOptions, deltaDir.getStatementId()); + boolean isRawFormatDelta = AcidUtils.MetaDataFile.isRawFormat(delta, delta.getFileSystem(conf)); + ReaderPair deltaPair; + if(isRawFormatDelta) { + assert mergerOptions.isCompacting() : "during regular read anything which is not a delete_delta is treated like base: " + delta; + //by setting rootPath() to the delta_x_y/ we make AcidUtils.getAcidState() return all files in this dir in Directory.getOriginalFiles() + Options rawCompactOptions = mergerOptions.clone().transactionId(deltaDir.getMinTransaction()).rootPath(delta); + deltaPair = new OriginalReaderPairToCompact(key, bucket, options, rawCompactOptions, conf, validTxnList); + } + else { + deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions, deltaDir.getStatementId()); + } if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 315cc1d3d1..8af38b26ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -196,7 +196,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { fields.add(new OrcStruct.Field("row", rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } - + /** + * @param path - partition root + */ OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index bcde4fc82f..4f729c808a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -316,6 +316,7 @@ private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { if (orcSplit.isOriginal()) { root = orcSplit.getRootDir(); } else { + //todo: why not just use getRootDir()? root = path.getParent().getParent(); assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() + " path.p.p=" + root; @@ -441,6 +442,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti /** * All "original" data belongs to txnid:0 and is always valid/committed for every reader * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal + * todo: Note: this is no longer true with HIVE-17361 and IOW */ } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index f7388a444d..e5d2c5a922 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -27,12 +27,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; @@ -98,6 +96,7 @@ this.length = fileSplit.getLength(); options.range(offset, length); options.include(OrcInputFormat.genIncludedColumns(schema, conf)); + //todo: above we assert that it's non-acid, so we really want to say non-acid schema OrcInputFormat.setSearchArgument(options, types, conf, true); this.reader = file.rowsOptions(options); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index cceea019c2..4c2e1d1e7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; @@ -1679,18 +1680,20 @@ public void loadPartition(Path loadPath, String tableName, * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid - * true if this is an ACID operation + * @param isAcidIUDoperation + * true if this is an ACID operation Insert/Update/Delete operation * @param hasFollowingStatsTask * true if there is a following task which updates the stats, so, this method need not update. * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId) + boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); + assert tbl.getPath() != null : "???"; + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); try { // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); @@ -1742,7 +1745,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); } - assert !isAcid; + assert !isAcidIUDoperation; if (areEventsForDmlNeeded(tbl, oldPart)) { newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } @@ -1766,16 +1769,22 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par filter = (loadFileType == LoadFileType.REPLACE_ALL) ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath); } - if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) { + //todo: why is "&& !isAcidIUDoperation" needed here? + if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { + //for fullAcid tables we don't delete files commands with OVERWRITE - we create a new + // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles); } } @@ -1865,6 +1874,40 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } } + /** + * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or + * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add + * this path component. + * @param txnId - id of current transaction (in which this operation is running) + * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()} + * @return appropriately modified path + */ + private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException { + switch (loadFileType) { + case REPLACE_ALL: + //should this have stmtId now? It would be weird for 1 txn to create 2 base dirs (same txnid)? + //that would mean to OVERWRITE stmts in 1 txn... + destPath = new Path(destPath, AcidUtils.baseDir(txnId)); + break; + case KEEP_EXISTING: + destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + break; + case OVERWRITE_EXISTING: + //should not happen here - this is for replication + default: + throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType); + } + try { + FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf()); + if(!FileUtils.mkdir(fs, destPath, conf)) { + LOG.error(destPath + " already exists?!?!");//should this throw? + } + AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true); + } catch (IOException e) { + throw new HiveException("loadTable: error while creating " + destPath, e); + } + return destPath; + } private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; @@ -2099,7 +2142,6 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param partSpec * @param loadFileType * @param numDP number of dynamic partitions - * @param listBucketingEnabled * @param isAcid true if this is an ACID operation * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) @@ -2247,20 +2289,25 @@ public Void call() throws Exception { * if list bucketing enabled * @param hasFollowingStatsTask * if there is any following stats task - * @param isAcid true if this is an ACID based write + * @param isAcidIUDoperation true if this is an ACID based Insert/update/delete + * @param isMmTable - todo: why pass this along? + * + * What we really need to know here is why this is done? Is this LOAD DATA or INSERT or what? */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, + boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId, boolean isMmTable) throws HiveException { - List newFiles = null; Table tbl = getTable(tableName); + assert tbl.getPath() != null : "???"; + isMmTable = AcidUtils.isInsertOnlyTable(tbl); + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); HiveConf sessionConf = SessionState.getSessionConf(); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList()); } // Note: this assumes both paths are qualified; which they are, currently. - if (isMmTable && loadPath.equals(tbl.getPath())) { + if (isMmTable && loadPath.equals(tbl.getPath())) {//presumably this means it must be an un-partitioned table... Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath()); if (loadFileType == LoadFileType.REPLACE_ALL) { Path tableDest = tbl.getPath(); @@ -2272,24 +2319,31 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. - Path tblPath = tbl.getPath(), destPath = tblPath; + Path tblPath = tbl.getPath(); + Path destPath = tblPath; PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; if (isMmTable) { + assert !isAcidIUDoperation; // We will load into MM directory, and delete from the parent if needed. destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); filter = loadFileType == LoadFileType.REPLACE_ALL ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath + " (replace = " + loadFileType + ")"); - if (loadFileType == LoadFileType.REPLACE_ALL) { + if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) { + //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 + //todo: should probably do the same for MM IOW boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tblPath, loadPath, destPath, tblPath, sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); @@ -2332,7 +2386,6 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } - /** * Creates a partition. * diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cd75130d7c..717c785f31 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -391,7 +391,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadTableDesc loadTableWork = new LoadTableDesc(destPath, Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); Task loadTableTask = TaskFactory.get(mv, x.getConf()); @@ -400,6 +399,10 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return loadTableTask; } + /** + * todo: this is odd: transactions are opened for all statements. what is this supposed to check? + */ + @Deprecated private static boolean isAcid(Long txnId) { return (txnId != null) && (txnId != 0); } @@ -473,6 +476,7 @@ private static boolean isAcid(Long txnId) { Task copyTask = null; if (replicationSpec.isInReplicationScope()) { if (isSourceMm || isAcid(txnId)) { + //isAcid(txnId) - tran // Note: this is replication gap, not MM gap... Repl V2 is not ready yet. throw new RuntimeException("Replicating MM and ACID tables is not supported"); } @@ -490,7 +494,6 @@ private static boolean isAcid(Long txnId) { partSpec.getPartSpec(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 238fbd6057..78a018dcb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -230,9 +230,6 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } } - if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) { - throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName()); - } // make sure the arguments make sense List files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); @@ -277,17 +274,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } Long txnId = null; - int stmtId = 0; - Table tbl = ts.tableHandle; - if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) { + int stmtId = -1; + if (AcidUtils.isAcidTable(ts.tableHandle)) { txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement(); } LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); if (preservePartitionSpecs){ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 1fa7b40ada..4683c9c7a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -47,9 +47,22 @@ private Map partitionSpec; // NOTE: this partitionSpec has to be ordered map public enum LoadFileType { - REPLACE_ALL, // Remove all existing data before copy/move - KEEP_EXISTING, // If any file exist while copy, then just duplicate the file - OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file + /** + * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event. + * Remove all existing data before copy/move + */ + REPLACE_ALL, + /** + * This corresponds to INSERT INTO and LOAD DATA. + * If any file exist while copy, then just duplicate the file + */ + KEEP_EXISTING, + /** + * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite + * the file instead of making a duplicate copy. + * If any file exist while copy, then just overwrite the file + */ + OVERWRITE_EXISTING } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); @@ -215,14 +228,10 @@ public long getTxnId() { return currentTransactionId == null ? 0 : currentTransactionId; } - public void setTxnId(Long txnId) { - this.currentTransactionId = txnId; - } - public int getStmtId() { return stmtId; } - + //todo: should this not be passed in the c'tor? public void setStmtId(int stmtId) { this.stmtId = stmtId; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 149a9adfd8..f455040aa5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -526,7 +526,8 @@ public void testMergeType2SCD01() throws Exception { String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " + "when matched then update set cur=0 " + "when not matched then insert values(s.key,s.data,1)"; - + //to allow cross join from 'teeCurMatch' + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); runStatementOnDriver(stmt); int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; List r = runStatementOnDriver("select * from target order by key,data,cur"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b877253210..d1f6d01cd8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -81,7 +81,7 @@ ).getPath().replaceAll("\\\\", "/"); protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - protected static int BUCKET_COUNT = 2; + static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); @@ -121,12 +121,11 @@ public void setUp() throws Exception { setUpWithTableProperties("'transactional'='true'"); } - protected void setUpWithTableProperties(String tableProperties) throws Exception { + void setUpWithTableProperties(String tableProperties) throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -410,7 +409,7 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { expectedException.expect(RuntimeException.class); expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); + runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')"); } /** * Test the query correctness and directory layout for ACID table conversion diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java new file mode 100644 index 0000000000..0a1175a80b --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -0,0 +1,242 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +//todo :[ERROR] TestTxnCommands2.testFailureOnAlteringTransactionalProperties Expected test to throw (an instance of java.lang.RuntimeException and exception with message a string containing "TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created") + +/** + * ToDo: + * 1. Clean up existing code and upload a patch + * 1.1. Document assumptions - what is isOriginal etc + * 1.2 rename ORIGINAL_BASE, etc + * 1,22 - clean up AcidUtils.MetaDataFile - enums + * 1.3 Fix TransactionValidationList... make sure transactiona_props are always set - Done! + * 1.4. test non-acid2acid conversion with LD - Done + * 1.5 submit patch for test + * 2. Compactor doesn't work at all + * 4. What does compactor do if we just have base_x and nothing else? It can't produce another base_x + * 5. Vectorization path + * 6. LoadSemanticAnalyzer.applyConstraintsAndGetFiles() verifies that there are no subdirs. But not filenames - add it for acid? + * 6.1 should it verify that schema matches? Say someone adds bucket_000 that is already acid? + * + * + * 50. postpone SMB stuff until stuff is stabilized etc + */ +public class TestTxnLoadData extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + /** + * Make qualified so that paths in ptests work + * @return + * @throws IOException + */ + private Path getWarehousePath() throws IOException { + FileSystem fs = FileSystem.get(hiveConf); + return fs.makeQualified(new Path(getWarehouseDir())); + } + + /** + * Note {@link org.apache.hadoop.hive.ql.metadata.Hive#isSubDir(Path, Path, FileSystem, FileSystem, boolean)} - why did I need this? + * @throws Exception + * + * try + * 1. nonacid + insert + * 2. make acid + * 3. insert + * 4 load + * 5 load + overwrite + * 6 read + * 7 compact + * 8 read + * + * Load Data [overwrite] in to an un-partitioned transactional table + */ + @Test + public void loadData() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("insert into T values(0,2),(0,4)"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + FileSystem fs = FileSystem.get(hiveConf); + //todo: see if this works in PTest + Path sourcePath = fs.makeQualified(new Path(getWarehouseDir(), "Tstage")); + //and do a Load Data into the same table, which should now land in a delta/ + runStatementOnDriver("load data inpath '" + sourcePath.toUri() + "' into table T"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + String[][] expected = new String[][] { + //normal insert + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"}, + //Load Data + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/000000_0"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/000000_0"}}; + checkExpected(rs, expected, "load data inpath"); + + + //create more staging data since previous files were moved by Load Data + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' overwrite into table T"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000022/000000_0"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000022/000000_0"}}; + checkExpected(rs, expected2, "load data inpath overwrite"); + + //todo: COMPACTION!!!! + } + /** + * Load Data [overwrite] in to an (un-)partitioned acid converted table + */ + @Test + public void loadDataNonAcid2AcidConversion() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + //per acid write + runStatementOnDriver("insert into T values(0,2),(0,4)"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + if(false) { + //todo: add 2 more inserts - to test loading with multiple files + runStatementOnDriver("insert into Tstage values(2,2),(3,3)"); + runStatementOnDriver("insert into Tstage values(4,4),(5,5)"); + } + //now convert to acid + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')"); + //and do a Load Data into the same table, which should now land in a delta/ + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' into table T"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + + String[][] expected = new String[][] { + //pre-acid insert + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, + //Load Data into acid converted table + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}, + //todo: this didn't work +/* + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000022_0000022_0000/000000_0"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000022_0000022_0000/000000_0"}, + + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":2}\t2\t2", "t/delta_0000022_0000022_0000/000000_0_copy_1"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":3}\t3\t2", "t/delta_0000022_0000022_0000/000000_0_copy_1"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":4}\t4\t4", "t/delta_0000022_0000022_0000/000000_0_copy_2"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":5}\t5\t5", "t/delta_0000022_0000022_0000/000000_0_copy_2"} + */ + }; + checkExpected(rs, expected, "load data inpath"); + //create more staging data since previous files were moved by Load Data + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' overwrite into table T"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000025/000000_0"}, + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000025/000000_0"} + }; + } + + /** + * Load Data [overwrite] in to a partitioned transactional table + */ + @Test + public void loadDataPartitioned() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); + + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' into table T partition(p=0)"); + runStatementOnDriver("insert into Tstage values(1,2),(1,4)"); + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' into table T partition(p=1)"); + runStatementOnDriver("insert into Tstage values(2,2),(2,4)"); + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' into table T partition(p=1)"); + List rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + String[][] expected = new String[][] { + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000018_0000018_0000/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000018_0000018_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000022_0000022_0000/000000_0"}, + {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000022_0000022_0000/000000_0"}}; + checkExpected(rs, expected, "load data inpath partitioned"); + + + runStatementOnDriver("insert into Tstage values(5,2),(5,4)"); + runStatementOnDriver("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' overwrite into table T partition(p=1)"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000018_0000018_0000/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000018_0000018_0000/000000_0"}, + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000025/000000_0"}, + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000025/000000_0"}}; + rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + checkExpected(rs, expected2, "load data inpath partitioned overwrite"); + } + + /** + * By default you can't load into bucketed tables. Things will break badly in acid (data loss, etc) + * if loaded data is not bucketed properly. This test is to capture that this is still the default. + * If the default is changed, Load Data should probably do more validation to ensure data is + * properly distributed into files and files are named correctly. + */ + @Test + public void testValidations() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')"); + File createdFile= folder.newFile("myfile.txt"); + FileUtils.writeStringToFile(createdFile, "hello world"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("load data inpath '" + new Path(getWarehousePath(), "Tstage").toUri() + "' into table T"); + Assert.assertTrue(cpr.getErrorMessage().contains("Load into bucketed tables are disabled")); + } + private void checkExpected(List rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG); + } + @Test + public void testMMOrcTable() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')"); + int[][] values = {{1,2},{3,4}}; + runStatementOnDriver("insert into T " + makeValuesClause(values)); + List rs = runStatementOnDriver("select a, b from T order by b"); + Assert.assertEquals("", stringifyValues(values), rs); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index f0d9ff2235..55a9da37da 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -259,18 +259,6 @@ public void testInsertToAcidWithUnionRemove() throws Exception { }; checkExpected(rs, expected, "Unexpected row count after ctas"); } - private void checkExpected(List rs, String[][] expected, String msg) { - LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); - for(String s : rs) { - LOG.warn(s); - } - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); - //verify data and layout - for(int i = 0; i < expected.length; i++) { - Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); - } - } /** * The idea here is to create a non acid table that was written by multiple writers, i.e. * unbucketed table that has 000000_0 & 000001_0, for example. @@ -640,5 +628,8 @@ private void assertVectorized(boolean vectorized, String query) throws Exception } Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); } + private void checkExpected(List rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 8737369c39..11b649645e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; +import org.slf4j.Logger; import java.io.File; import java.util.ArrayList; @@ -74,7 +75,6 @@ void setUpInternal() throws Exception { hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -176,4 +176,16 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex } Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles); } + void checkExpected(List rs, String[][] expected, String msg, Logger LOG) { + LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index ccd7d8ef96..c2c841ba4e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -836,6 +836,10 @@ public void testBIStrategySplitBlockBoundary() throws Exception { public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000"); + AcidUtils.setTransactionalTableScan(conf, true); + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/1/part-00", 1000, new byte[1]), @@ -904,7 +908,7 @@ public void testEtlCombinedStrategy() throws Exception { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } @@ -918,7 +922,7 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index b2ac687c75..95e34632b9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -48,7 +48,7 @@ import org.apache.orc.TypeDescription; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; + /** * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set * of delete delta files. The split is on an insert delta and there are multiple delete deltas @@ -186,7 +186,7 @@ public void setup() throws Exception { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); assertEquals(1, splitStrategies.size()); List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 25caf2929d..c0111569c4 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -128,7 +128,12 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); } if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) { - //only need to check conformance if alter table enabled aicd + if(!isTransactionalPropertiesPresent) { + normazlieTransactionalPropertyDefault(newTable); + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY; + } + //only need to check conformance if alter table enabled acid if (!conformToAcid(newTable)) { // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) { @@ -232,6 +237,9 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + if(transactionalProperties == null) { + normazlieTransactionalPropertyDefault(newTable); + } initializeTransactionalProperties(newTable); return; } @@ -241,6 +249,16 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } /** + * When a table is marked transactional=true but transactional_properties is not set then + * transactional_properties should take on the default value. Easier to make this explicit in + * table definition than keep checking everywhere if it's set or not. + */ + private void normazlieTransactionalPropertyDefault(Table table) { + table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + DEFAULT_TRANSACTIONAL_PROPERTY); + + } + /** * Check that InputFormatClass/OutputFormatClass should implement * AcidInputFormat/AcidOutputFormat */