diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0cc8de0e66..0776170fad 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1902,7 +1902,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" + "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" + "This is intended to be used as an internal property for future versions of ACID. (See\n" + - "HIVE-14035 for details.)"), + "HIVE-14035 for details. User sets it tblproperites via transactional_properties.)", true), HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 2f0efceaa9..d73cd6426c 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -106,7 +106,7 @@ protected void setUp() { db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, - LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false); + LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0); i++; } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 7103fb95b6..ebbfedab24 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -114,7 +115,7 @@ public void setup() throws Exception { hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");//todo: remove this //"org.apache.hadoop.hive.ql.io.HiveInputFormat" TxnDbUtil.setConfValues(hiveConf); @@ -669,7 +670,7 @@ public void minorCompactWhileStreaming() throws Exception { if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); } finally { connection.close(); @@ -698,7 +699,7 @@ public void majorCompactWhileStreaming() throws Exception { } // Start a third batch, but don't close it. - writeBatch(connection, writer, true); + writeBatch(connection, writer, true);//this delta will be ignored by compaction since it has an open txn in it // Now, compact TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -722,7 +723,7 @@ public void majorCompactWhileStreaming() throws Exception { } String name = stat[0].getPath().getName(); Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); } finally { connection.close(); } @@ -788,7 +789,7 @@ public void minorCompactAfterAbort() throws Exception { if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); } finally { connection.close(); } @@ -850,7 +851,7 @@ public void majorCompactAfterAbort() throws Exception { if (!name.equals("base_0000006")) { Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); } - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); } finally { connection.close(); } @@ -903,7 +904,7 @@ public void majorCompactWhileStreamingForSplitUpdate() throws Exception { } String name = stat[0].getPath().getName(); Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2); } finally { connection.close(); } @@ -966,7 +967,7 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1); // Verify that we have got correct set of delete_deltas. FileStatus[] deleteDeltaStat = @@ -984,7 +985,7 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1); } @Test @@ -1043,7 +1044,7 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1); // Verify that we have got correct set of delete_deltas. FileStatus[] deleteDeltaStat = @@ -1062,7 +1063,7 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } // There should be no rows in the delete_delta because there have been no delete events. - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1); } @Test @@ -1121,7 +1122,7 @@ public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L); + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); // Verify that we have got correct set of delete_deltas also FileStatus[] deleteDeltaStat = @@ -1140,7 +1141,7 @@ public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } // There should be no rows in the delete_delta because there have been no delete events. - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1); } finally { connection.close(); @@ -1295,7 +1296,7 @@ private void writeBatch(StreamingConnection connection, DelimitedInputWriter wri } private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, - String columnTypesProperty, int bucket, long min, long max) + String columnTypesProperty, int bucket, long min, long max, int numBuckets) throws IOException { ValidTxnList txnList = new ValidTxnList() { @Override @@ -1351,9 +1352,10 @@ public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { Configuration conf = new Configuration(); conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); + conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets)); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); AcidInputFormat.RawReader reader = - aif.getRawReader(conf, false, bucket, txnList, base, deltas); + aif.getRawReader(conf, true, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); OrcStruct value = reader.createValue(); long currentTxn = min; diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 3b1005f85c..d1b6211ce7 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -205,6 +205,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ load_dyn_part1.q,\ load_dyn_part2.q,\ load_dyn_part3.q,\ + load_data_into_acid.q,\ lvj_mapjoin.q,\ mapjoin2.q,\ mapjoin3.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 186d5809c8..2f7284f3f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -376,7 +376,6 @@ DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true), - LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " + @@ -550,6 +549,8 @@ ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables"), ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT(30022, "Must use HiveInputFormat to read ACID tables " + "(set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat)"), + ACID_LOAD_DATA_INVALID_FILE_NAME(30023, "{0} file name is not valid in Load Data into Acid " + + "table {1}. Examples of valid names are: 00000_0, 00000_0_copy_1", true), CONCATENATE_UNSUPPORTED_FILE_FORMAT(30030, "Concatenate/Merge only supported for RCFile and ORCFile formats"), CONCATENATE_UNSUPPORTED_TABLE_BUCKETED(30031, "Concatenate/Merge can not be performed on bucketed tables"), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index b3d7a03cc0..d3ceead5d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4459,7 +4459,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I part.getTPartition().getParameters().putAll(alterTbl.getProps()); } else { boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters()); - Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps()); + Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { result = generateAddMmTasks(tbl); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e2f8c1f801..12ed42313e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -215,7 +215,7 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { Context ctx = driverContext.getCtx(); if(ctx.getHiveTxnManager().supportsAcid()) { - //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit + //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit return; } HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager(); @@ -290,7 +290,7 @@ public int execute(DriverContext driverContext) { } else { Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath); if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { - //'targetPath' is table root of un-partitioned table/partition + //'targetPath' is table root of un-partitioned table or partition //'sourcePath' result of 'select ...' part of CTAS statement assert lfd.getIsDfsDir(); FileSystem srcFs = sourcePath.getFileSystem(conf); @@ -367,7 +367,7 @@ public int execute(DriverContext driverContext) { checkFileFormats(db, tbd, table); boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID - && !tbd.isMmTable(); + && !tbd.isMmTable();//it seems that LoadTableDesc has Operation.INSERT only for CTAS... // Create a data container DataContainer dc = null; @@ -379,7 +379,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable()); + tbd.getTxnId(), tbd.getStmtId()); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index bb1f4e5050..545b7a8b7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -229,6 +229,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + //todo: what is the point of this? If this is for replication, who would have opened a txn? SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c0b71f65f..1fef12ba3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -22,7 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -30,6 +30,8 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.Ref; import org.apache.orc.impl.OrcAcidUtils; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,13 +123,14 @@ public boolean accept(Path path) { public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); /** - * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read - * a "delta" dir written by a real Acid write - cannot have any copies + * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 + * (Unless via Load Data statment) */ public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { - return ORIGINAL_PATTERN.matcher(path.getName()).matches(); + return ORIGINAL_PATTERN.matcher(path.getName()).matches() || + ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches(); } }; @@ -137,6 +139,7 @@ private AcidUtils() { } private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class); + public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$"); public static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); /** @@ -156,14 +159,30 @@ public boolean accept(Path p){ private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); /** - * Create the bucket filename. + * Create the bucket filename in Acid format * @param subdir the subdirectory for the bucket. * @param bucket the bucket number * @return the filename */ public static Path createBucketFile(Path subdir, int bucket) { - return new Path(subdir, + return createBucketFile(subdir, bucket, true); + } + + /** + * Create acid or original bucket name + * @param subdir the subdirectory for the bucket. + * @param bucket the bucket number + * @return the filename + */ + private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) { + if(isAcidSchema) { + return new Path(subdir, BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket)); + } + else { + return new Path(subdir, + String.format(BUCKET_DIGITS, bucket)); + } } /** @@ -244,7 +263,7 @@ public static Path createFilename(Path directory, * @param path the base directory name * @return the maximum transaction id that is included */ - static long parseBase(Path path) { + public static long parseBase(Path path) { String filename = path.getName(); if (filename.startsWith(BASE_PREFIX)) { return Long.parseLong(filename.substring(BASE_PREFIX.length())); @@ -273,7 +292,7 @@ static long parseBase(Path path) { .minimumTransactionId(0) .maximumTransactionId(0) .bucket(bucket) - .writingBase(true); + .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); } else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { //todo: define groups in regex and use parseInt(Matcher.group(2)).... @@ -286,7 +305,7 @@ else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { .maximumTransactionId(0) .bucket(bucket) .copyNumber(copyNumber) - .writingBase(true); + .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); } else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = @@ -344,11 +363,17 @@ public static DataOperationType toDataOperationType(Operation op) { throw new IllegalArgumentException("Unexpected Operation: " + op); } } - public enum AcidBaseFileType { - COMPACTED_BASE, // a regular base file generated through major compaction - ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid - INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update + /** + * File w/o Acid meta columns. This this would be the case for files that were added to the table + * before it was converted to Acid but not yet major compacted. May also be the the result of + * Load Data statement on an acid table. + */ + ORIGINAL_BASE, + /** + * File that has Acid metadata columns embedded in it. Found in base_x/ or delta_x_y/. + */ + ACID_SCHEMA, } /** @@ -366,16 +391,12 @@ public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFi this.acidBaseFileType = acidBaseFileType; } - public boolean isCompactedBase() { - return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE; - } - public boolean isOriginal() { return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE; } - public boolean isInsertDelta() { - return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA; + public boolean isAcidSchema() { + return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA; } public HdfsFileStatusWithId getHdfsFileStatusWithId() { @@ -545,6 +566,7 @@ public String toString() { * @return the base directory to read */ Path getBaseDirectory(); + boolean isBaseInRawFormat(); /** * Get the list of original files. Not {@code null}. Must be sorted. @@ -577,13 +599,16 @@ public String toString() { } public static class ParsedDelta implements Comparable { + //todo: make sure this has "format" info private final long minTransaction; private final long maxTransaction; private final FileStatus path; //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID private final int statementId; + //todo: maybe "type" should be made more explicit: base, delete, raw.... or insert_acid, insert_raw, delete private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + private boolean isRawFormat = false; /** * for pre 1.3.x delta files @@ -597,6 +622,8 @@ public String toString() { this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; + //this.isRawFormat = isRawFormat; + assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; } public long getMinTransaction() { @@ -618,6 +645,13 @@ public int getStatementId() { public boolean isDeleteDelta() { return isDeleteDelta; } + //todo: this breaks Immutability of ParsedDelta + void setIsRawFormat(boolean isRawFormat) { + this.isRawFormat = isRawFormat; + } + public boolean isRawFormat() { + return isRawFormat; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files @@ -698,29 +732,6 @@ else if(statementId != parsedDelta.statementId) { } /** - * Convert the list of begin/end transaction id pairs to a list of delta - * directories. Note that there may be multiple delta files for the exact same txn range starting - * with 1.3.x; - * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} - * @param root the root directory - * @param deltas list of begin/end transaction id pairs - * @return the list of delta paths - */ - public static Path[] deserializeDeltas(Path root, final List deltas) throws IOException { - List results = new ArrayList(deltas.size()); - for(AcidInputFormat.DeltaMetaData dmd : deltas) { - if(dmd.getStmtIds().isEmpty()) { - results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); - continue; - } - for(Integer stmtId : dmd.getStmtIds()) { - results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); - } - } - return results.toArray(new Path[results.size()]); - } - - /** * Convert the list of begin/end transaction id pairs to a list of delete delta * directories. Note that there may be multiple delete_delta files for the exact same txn range starting * with 2.2.x; @@ -871,13 +882,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); + obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { getChildState(child, null, txnList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } @@ -976,12 +987,18 @@ else if (prev != null && next.maxTransaction == prev.maxTransaction //this does "Path.uri.compareTo(that.uri)" return o1.getFileStatus().compareTo(o2.getFileStatus()); }); - return new Directory(){ + + final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); + return new Directory() { @Override public Path getBaseDirectory() { return base; } + @Override + public boolean isBaseInRawFormat() { + return isBaseInRawFormat; + } @Override public List getOriginalFiles() { @@ -1022,7 +1039,7 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles, List aborted, Map tblproperties) throws IOException { + boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -1051,6 +1068,9 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String deltaPrefix = (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix); + if(!delta.isDeleteDelta()) { + delta.setIsRawFormat(MetaDataFile.isRawFormat(p, fs)); + } if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) && ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) { aborted.add(child); @@ -1171,8 +1191,11 @@ public static void setTransactionalTableScan(Map parameters, boo parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable)); } - public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) { - HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); + /** + * Means it's a full acid table + */ + public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) { + HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable); } /** * @param p - not null @@ -1185,6 +1208,8 @@ public static boolean isDeleteDelta(Path p) { * SessionState.get().getTxnMgr().supportsAcid() here * @param table table * @return true if table is a legit ACID table, false otherwise + * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers + * both Acid and MM tables. HIVE-18124 */ public static boolean isAcidTable(Table table) { if (table == null) { @@ -1197,6 +1222,10 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + /** + * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers + * both Acid and MM tables. HIVE-18124 + */ public static boolean isAcidTable(CreateTableDesc table) { if (table == null || table.getTblProps() == null) { return false; @@ -1208,8 +1237,13 @@ public static boolean isAcidTable(CreateTableDesc table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + /** + * after isTransactionalTable() then make this isAcid() HIVE-18124 + * @param table + * @return + */ public static boolean isFullAcidTable(Table table) { - return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters()); + return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table); } /** @@ -1336,6 +1370,9 @@ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOExc public static boolean isInsertOnlyTable(Map params) { return isInsertOnlyTable(params, false); } + public static boolean isInsertOnlyTable(Table table) { + return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly(); + } // TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used? public static boolean isInsertOnlyTable(Map params, boolean isCtas) { @@ -1349,13 +1386,17 @@ public static boolean isInsertOnlyTable(Properties params) { return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp)); } - /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */ - public static Boolean isToInsertOnlyTable(Map props) { + /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. + * todo: All such validation logic should be TransactionValidationListener*/ + public static Boolean isToInsertOnlyTable(Table tbl, Map props) { // Note: Setting these separately is a very hairy issue in certain combinations, since we // cannot decide what type of table this becomes without taking both into account, and // in many cases the conversion might be illegal. // The only thing we allow is tx = true w/o tx-props, for backward compat. String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if(transactional == null) { + transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + } String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); if (transactional == null && transactionalProp == null) return null; // Not affected. boolean isSetToTxn = "true".equalsIgnoreCase(transactional); @@ -1378,4 +1419,69 @@ public static boolean isRemovedInsertOnlyTable(Set removedSet) { hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return hasTxn || hasProps; } + public static class MetaDataFile { + private static final String METADATA_FILE = "_metadata_acid";//export uses _metadata.... + private static final String CURRENT_VERSION = "0"; + //todo: enums? that have both field name and value list + private interface Field { + String VERSION = "thisFileVersion"; + String DATA_FORMAT = "dataFormat"; + } + private interface Value { + //plain ORC file + String RAW = "raw"; + //result of acid write, i.e. decorated with ROW__ID info + String NATIVE = "native"; + } + + /** + * @param baseOrDeltaDir detla or base dir, must exist + */ + public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat) + throws IOException { + /** + * create _meta_data json file in baseOrDeltaDir + * write thisFileVersion, dataFormat + * + * on read if the file is not there, assume version 0 and dataFormat=acid + */ + Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); + Map metaData = new HashMap<>(); + metaData.put(Field.VERSION, CURRENT_VERSION); + metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE); + try (FSDataOutputStream strm = fs.create(formatFile, false)) { + new ObjectMapper().writeValue(strm, metaData); + } catch (IOException ioe) { + String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE + ": " + ioe.getMessage(); + LOG.error(msg, ioe); + throw ioe; + } + } + public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); + if(!fs.exists(formatFile)) { + return false; + } + try (FSDataInputStream strm = fs.open(formatFile)) { + Map metaData = new ObjectMapper().readValue(strm, Map.class); + if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) { + throw new IllegalStateException("Unexpected Meta Data version: " + metaData.get(Field.VERSION)); + } + String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null"); + switch (dataFormat) { + case Value.NATIVE: + return false; + case Value.RAW: + return true; + default: + throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT + ": " + dataFormat); + } + } + catch(IOException e) { + String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE + ": " + e.getMessage(); + LOG.error(msg, e); + throw e; + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6a1dc729f3..819c2a2667 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -468,6 +468,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job try { Utilities.copyTablePropertiesToConf(table, conf); + if(tableScan != null) { + AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable()); + } } catch (HiveException e) { throw new IOException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index dda9f93839..3ff74bd772 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; import java.io.IOException; @@ -409,7 +410,7 @@ public static boolean isOriginal(Footer footer) { * @param readerSchema the types for the reader * @param conf the configuration */ - public static boolean[] genIncludedColumns(TypeDescription readerSchema, + static boolean[] genIncludedColumns(TypeDescription readerSchema, Configuration conf) { if (!ColumnProjectionUtils.isReadAllColumns(conf)) { List included = ColumnProjectionUtils.getReadColumnIDs(conf); @@ -419,7 +420,7 @@ public static boolean isOriginal(Footer footer) { } } - public static String[] getSargColumnNames(String[] originalColumnNames, + private static String[] getSargColumnNames(String[] originalColumnNames, List types, boolean[] includedColumns, boolean isOriginal) { int rootColumn = getRootColumn(isOriginal); String[] columnNames = new String[types.size() - rootColumn]; @@ -695,21 +696,29 @@ public static void clearLocalCache() { */ @VisibleForTesting static final class AcidDirInfo { - public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, + AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, List baseFiles, - List parsedDeltas) { + List deleteEvents) { this.splitPath = splitPath; this.acidInfo = acidInfo; this.baseFiles = baseFiles; this.fs = fs; - this.parsedDeltas = parsedDeltas; + this.deleteEvents = deleteEvents; } final FileSystem fs; final Path splitPath; final AcidUtils.Directory acidInfo; final List baseFiles; - final List parsedDeltas; + final List deleteEvents; + + /** + * No (qualifying) data files found in {@link #splitPath} + * @return + */ + boolean isEmpty() { + return (baseFiles == null || baseFiles.isEmpty()); + } } @VisibleForTesting @@ -884,7 +893,7 @@ public String toString() { public CombineResult combineWith(FileSystem fs, Path dir, List otherFiles, boolean isOriginal) { if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT - || this.isOriginal != isOriginal) { + || this.isOriginal != isOriginal) {//todo: what is this checking???? return (files.size() > otherFiles.size()) ? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE; } @@ -1083,6 +1092,12 @@ public String toString() { static final class FileGenerator implements Callable { private final Context context; private final FileSystem fs; + /** + * For plain or acid tables this is the root of the partition (or table if not partitioned). + * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that + * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already + * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}. + */ private final Path dir; private final Ref useFileIds; private final UserGroupInformation ugi; @@ -1119,25 +1134,27 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { + //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, context.transactionList, useFileIds, true, null); - Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) - List baseFiles = new ArrayList(); - if (base == null) { + List baseFiles = new ArrayList<>(); + if (dirInfo.getBaseDirectory() == null) { + //for non-acid tables, all data files are in getOriginalFiles() list for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } } else { - List compactedBaseFiles = findBaseFiles(base, useFileIds); + List compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds); for (HdfsFileStatusWithId fileId : compactedBaseFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE)); + baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ? + AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA)); } } // Find the parsed deltas- some of them containing only the insert delta events // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) - List parsedDeltas = new ArrayList(); + List parsedDeltas = new ArrayList<>(); if (context.acidOperationalProperties != null && context.acidOperationalProperties.isSplitUpdate()) { @@ -1154,15 +1171,26 @@ private AcidDirInfo callInternal() throws IOException { if (parsedDelta.isDeleteDelta()) { parsedDeltas.add(parsedDelta); } else { + AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ? + AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA; + PathFilter bucketFilter = parsedDelta.isRawFormat() ? + AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter; + if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() != + parsedDelta.getMaxTransaction()) { + //delta/ with files in raw format are a result of Load Data (as opposed to compaction + //or streaming ingest so must have interval length == 1. + throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE + + " format but txnIds are out of range: " + parsedDelta.getPath()); + } // This is a normal insert delta, which only has insert events and hence all the files // in this delta directory can be considered as a base. Boolean val = useFileIds.value; if (val == null || val) { try { List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); + SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter); for (HdfsFileStatusWithId fileId : insertDeltaFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); } if (val == null) { useFileIds.value = true; // The call succeeded, so presumably the API is there. @@ -1176,15 +1204,20 @@ private AcidDirInfo callInternal() throws IOException { } } // Fall back to regular API and create statuses without ID. - List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); + List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter); for (FileStatus child : children) { HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); } } } } else { + /* + We already handled all delete deltas above and there should not be any other deltas for + any table type. (this was acid 1.0 code path). + */ + assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir; // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); @@ -1658,7 +1691,7 @@ private long computeProjectionSize(List fileTypes, pathFutures.add(ecs.submit(fileGenerator)); } - boolean isTransactionalTableScan =//this never seems to be set correctly + boolean isTransactionalTableScan = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); TypeDescription readerSchema = @@ -1700,13 +1733,16 @@ private long computeProjectionSize(List fileTypes, // We have received a new directory information, make split strategies. --resultsLeft; - + if(adi.isEmpty()) { + //no files found, for example empty table/partition + continue; + } // The reason why we can get a list of split strategies here is because for ACID split-update // case when we have a mix of original base files & insert deltas, we will produce two // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -1790,6 +1826,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) { if (!deltas.isEmpty() || combinedCtx == null) { + //why is this checking for deltas.isEmpty() - HIVE-18110 return new ETLSplitStrategy( context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi, allowSyntheticFileIds, isDefaultFs); @@ -1955,6 +1992,7 @@ public float getProgress() throws IOException { final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); mergerOptions.rootPath(split.getRootDir()); + mergerOptions.bucketPath(split.getPath()); final int bucket; if (split.hasBase()) { AcidOutputFormat.Options acidIOOptions = @@ -1968,8 +2006,9 @@ public float getProgress() throws IOException { } } else { bucket = (int) split.getStart(); + assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath(); } - + //todo: createOptionsForReader() assumes it's !isOriginal.... why? final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); @@ -2041,6 +2080,7 @@ public float getProgress() throws IOException { // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. final List schemaTypes = OrcUtils.getOrcTypes(schema); readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf)); + //todo: last param is bogus. why is this hardcoded? OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true); return readerOptions; } @@ -2138,6 +2178,11 @@ private static boolean isStripeSatisfyPredicate( return sarg.evaluate(truthValues).isNeeded(); } + /** + * do we still need this? what we really need is pass info into OrcSplit about the type of delta/base it is + * did we need this becase "original" files didn't vectorize? + * For MM table we can have no base + (insert) deltas. May also have base + no deltas. + */ @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, @@ -2153,23 +2198,24 @@ private static boolean isStripeSatisfyPredicate( boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem) && HdfsUtils.isDefaultFs((DistributedFileSystem) fs)); - // When no baseFiles, we will just generate a single split strategy and return. - List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, + assert false : "acid 2.0 no base?!: " + dir; + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(), false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs); if (splitStrategy != null) { splitStrategies.add(splitStrategy); } - return splitStrategies; // return here + return splitStrategies; } + List acidSchemaFiles = new ArrayList<>(); List originalSchemaFiles = new ArrayList(); // Separate the base files into acid schema and non-acid(original) schema files. for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) { if (acidBaseFileInfo.isOriginal()) { originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); } else { + assert acidBaseFileInfo.isAcidSchema(); acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); } } @@ -2195,8 +2241,7 @@ private static boolean isStripeSatisfyPredicate( return splitStrategies; } - @VisibleForTesting - static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, + private static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, List baseFiles, boolean isOriginal, @@ -2249,34 +2294,36 @@ private static boolean isStripeSatisfyPredicate( context.acidOperationalProperties); } } - @Override public RawReader getRawReader(Configuration conf, boolean collapseEvents, - int bucket, + int bucket,//rename splitId ValidTxnList validTxnList, Path baseDirectory, Path[] deltaDirectory ) throws IOException { - Reader reader = null; boolean isOriginal = false; + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true) + .isMajorCompaction(collapseEvents); if (baseDirectory != null) {//this is NULL for minor compaction - Path bucketFile = null; + //it may also be null if there is no base - only deltas + mergerOptions.baseDir(baseDirectory); if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { - bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); + isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf)); + mergerOptions.rootPath(baseDirectory.getParent()); } else { /**we don't know which file to start reading - * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/ isOriginal = true; + mergerOptions.rootPath(baseDirectory); } - if(bucketFile != null) { - reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); - } } - OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options() - .isCompacting(true) - .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null); - return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, + else { + //since we have no base, there must be at least 1 delta which must a result of acid write + //so it must be immediate child of the partition + mergerOptions.rootPath(deltaDirectory[0].getParent()); + } + return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal, bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95a60dc032..9172eb946a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -47,6 +47,8 @@ import com.google.common.annotations.VisibleForTesting; +import javax.swing.text.html.Option; + /** * Merges a base and a list of delta files together into a single stream of * events. @@ -88,11 +90,11 @@ */ private int statementId;//sort on this descending, like currentTransactionId - public ReaderKey() { + ReaderKey() { this(-1, -1, -1, -1, 0); } - public ReaderKey(long originalTransaction, int bucket, long rowId, + ReaderKey(long originalTransaction, int bucket, long rowId, long currentTransactionId) { this(originalTransaction, bucket, rowId, currentTransactionId, 0); } @@ -320,12 +322,24 @@ public void next(OrcStruct next) throws IOException { private final ReaderKey key; final int bucketId; final int bucketProperty; + /** + * TransactionId to use when generating synthetic ROW_IDs + */ + final long transactionId; - OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException { + OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions) throws IOException { this.key = key; this.bucketId = bucketId; assert bucketId >= 0 : "don't support non-bucketed tables yet"; this.bucketProperty = encodeBucketId(conf, bucketId); + //for normal read we should have isOriginal from OrcSplit so we know ... no we don't. We just know that it's original schema but we don't know the txnid to use + //walk up until you find base/delta or partition root - in the last case it's 0 otherwise parse it + //for compaction, isOriginal is useles so we need to find delta and parse _meta_data file + //we are calling getAcidState for Original reader anyway - get txn there? + //oops - right now it always reads getOriginalFiles() but now we wan it to read base/ or delta/ + //also, for unbucketed tables should we allow loading a dir with copy_N files? + //in general what checks do we perform on input data? + transactionId = mergeOptions.getTransactionId(); } @Override public final OrcStruct nextRecord() { return nextRecord; @@ -337,6 +351,10 @@ public int getColumns() { @Override public final ReaderKey getKey() { return key; } /** + * + * + * + * * The cumulative number of row in all files of the logical bucket that precede the file * represented by {@link #getRecordReader()} */ @@ -355,9 +373,10 @@ final boolean nextFromCurrentFile(OrcStruct next) throws IOException { new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); + nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucketProperty)); nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID, @@ -369,17 +388,17 @@ final boolean nextFromCurrentFile(OrcStruct next) throws IOException { ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) - .set(0); + .set(transactionId); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucketProperty); ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) - .set(0); + .set(transactionId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); nextRecord().setFieldValue(OrcRecordUpdater.ROW, getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucketProperty, nextRowId, 0L, 0); + key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0); if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + getMaxKey()); @@ -401,12 +420,11 @@ static int encodeBucketId(Configuration conf, int bucketId) { private final RecordReader recordReader; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId, final RecordIdentifier minKey, final RecordIdentifier maxKey, Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + super(key, bucketId, conf, mergerOptions); this.reader = reader; assert !mergerOptions.isCompacting(); assert mergerOptions.getRootPath() != null : "Since we have original files"; @@ -426,6 +444,11 @@ static int encodeBucketId(Configuration conf, int bucketId) { boolean haveSeenCurrentFile = false; long rowIdOffsetTmp = 0; { + /** + * Note that for reading base_x/ or delta_x_x/ with non-acid schema, + * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's + * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} + */ //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); @@ -469,7 +492,7 @@ static int encodeBucketId(Configuration conf, int bucketId) { * If this is not the 1st file, set minKey 1 less than the start of current file * (Would not need to set minKey if we knew that there are no delta files) * {@link #advanceToMinKey()} needs this */ - newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1); + newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1); } if (maxKey != null) { maxKey.setRowId(maxKey.getRowId() + rowIdOffset); @@ -482,7 +505,7 @@ static int encodeBucketId(Configuration conf, int bucketId) { * of the file so we want to leave it blank to make sure any insert events in delta * files are included; Conversely, if it's not the last file, set the maxKey so that * events from deltas that don't modify anything in the current split are excluded*/ - newMaxKey = new RecordIdentifier(0, bucketProperty, + newMaxKey = new RecordIdentifier(transactionId, bucketProperty, rowIdOffset + reader.getNumberOfRows() - 1); } this.minKey = newMinKey; @@ -533,7 +556,7 @@ public void next(OrcStruct next) throws IOException { OriginalReaderPairToCompact(ReaderKey key, int bucketId, Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + super(key, bucketId, conf, mergerOptions); assert mergerOptions.isCompacting() : "Should only be used for Compaction"; this.conf = conf; this.options = options; @@ -544,6 +567,11 @@ public void next(OrcStruct next) throws IOException { assert options.getMaxOffset() == Long.MAX_VALUE; AcidUtils.Directory directoryState = AcidUtils.getAcidState( mergerOptions.getRootPath(), conf, validTxnList, false, true); + /** + * Note that for reading base_x/ or delta_x_x/ with non-acid schema, + * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's + * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} + */ originalFiles = directoryState.getOriginalFiles(); assert originalFiles.size() > 0; this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket @@ -755,13 +783,21 @@ private KeyInterval discoverKeyBounds(Reader reader, * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts. * This makes the "context" explicit. */ - static class Options { + static class Options implements Cloneable { private int copyIndex = 0; private boolean isCompacting = false; private Path bucketPath; private Path rootPath; + private Path baseDir; private boolean isMajorCompaction = false; private boolean isDeleteReader = false; + /** + * for reading "original" files - i.e. not native acid schema. Default value of 0 is + * appropriate for files that existed in a table before it was made transactional. 0 is the + * primordial transaction. For non-native files resulting from Load Data command, they + * are located and base_x or delta_x_x and then transactionId == x. + */ + private long transactionId = 0; Options copyIndex(int copyIndex) { assert copyIndex >= 0; this.copyIndex = copyIndex; @@ -790,6 +826,14 @@ Options isDeleteReader(boolean isDeleteReader) { assert !isCompacting; return this; } + Options transactionId(long transactionId) { + this.transactionId = transactionId; + return this; + } + Options baseDir(Path baseDir) { + this.baseDir = baseDir; + return this; + } /** * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix */ @@ -825,17 +869,40 @@ boolean isMinorCompaction() { boolean isDeleteReader() { return isDeleteReader; } + long getTransactionId() { + return transactionId; + } + + /** + * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x + * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion + */ + Path getBaseDir() { + return baseDir; + } + /** + * shallow clone + */ + public Options clone() { + try { + return (Options) super.clone(); + } + catch(CloneNotSupportedException ex) { + throw new AssertionError(); + } + } } - /** - * Create a reader that merge sorts the ACID events together. - * @param conf the configuration - * @param collapseEvents should the events on the same row be collapsed - * @param isOriginal is the base file a pre-acid file - * @param bucket the bucket we are reading - * @param options the options to read with - * @param deltaDirectory the list of delta directories to include - * @throws IOException - */ + + /** + * Create a reader that merge sorts the ACID events together. + * @param conf the configuration + * @param collapseEvents should the events on the same row be collapsed - todo: rename? + * @param isOriginal is the base file a pre-acid file - todo: rename? + * @param bucket the bucket we are reading + * @param options the options to read with + * @param deltaDirectory the list of delta directories to include + * @throws IOException + */ OrcRawRecordMerger(Configuration conf, boolean collapseEvents, Reader reader, @@ -844,6 +911,11 @@ boolean isDeleteReader() { ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory, Options mergerOptions) throws IOException { + /** + * Since we have mergeOptions.getBucketPath() we have delta/base dir and we can read the _meta_data file + * it's a bit ugly - we should refactor this + */ + this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); @@ -887,11 +959,14 @@ boolean isDeleteReader() { objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + assert !(mergerOptions.isCompacting() && reader != null) : "don't need no reader for compaction"; // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); + //suppose it's the first Major compaction so we only have deltas + boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction() && mergerOptions.getBaseDir() == null; if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) || - mergerOptions.isDeleteReader()) { + mergerOptions.isDeleteReader() || isMajorNoBase) { //for minor compaction, there is no progress report and we don't filter deltas baseReader = null; minKey = maxKey = null; @@ -913,18 +988,62 @@ boolean isDeleteReader() { } LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range - ReaderPair pair; + ReaderPair pair = null; ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { - pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, + assert mergerOptions.isMajorCompaction(); + Options readerPairOptions = mergerOptions; + if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) { + readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, + AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); + } + pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions, conf, validTxnList); } else { + + //todo: use findTransactionIDForSynthetcRowIDs() + //the input 'reader' is the 'split' so it's correct + assert mergerOptions.getBucketPath() != null : " since this is not compaction"; + //if here it's a non-acid schema file - check if from before table was marked transactional + //or in base_x/delta_x_x from Load Data + Path parent = mergerOptions.getBucketPath().getParent(); + Options readerPairOptions = mergerOptions; + while(parent != null && !parent.equals(mergerOptions.getRootPath())) { + //https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Loadingfilesintotables says input dir cannot + //have subdirs (is this enforced?) - todo: then this doesn't have to walk + boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); + long transactionIdForSyntheticRowIDs; + if(isBase || isDelta) { + if(isBase) { + transactionIdForSyntheticRowIDs = AcidUtils.parseBase(parent); + } + else { + AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX); + assert pd.getMinTransaction() == pd.getMaxTransaction() : + "This a delta with raw non acid schema, must be result of single write, no compaction: " + + mergerOptions.getBucketPath(); + transactionIdForSyntheticRowIDs = pd.getMinTransaction(); + } + readerPairOptions = modifyForNonAcidSchemaRead( + mergerOptions, transactionIdForSyntheticRowIDs, parent); + break; + } + parent = parent.getParent(); + } pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), - keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); + keyInterval.getMaxKey(), options, readerPairOptions, conf, validTxnList); } } else { + if(mergerOptions.isCompacting()) { + assert mergerOptions.isMajorCompaction() : "expected major compaction: " + mergerOptions.getBaseDir() + ":" + bucket; + assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath(); + //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty + reader = OrcFile.createReader(AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket), OrcFile.readerOptions(conf)); + } + assert reader != null : "no reader? " + mergerOptions.getRootPath(); pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions, 0); } @@ -938,10 +1057,10 @@ boolean isDeleteReader() { baseReader = pair.getRecordReader(); } - if (deltaDirectory != null) { - /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no - * user columns - * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/ + if (deltaDirectory != null && deltaDirectory.length > 0) { + /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no + * user columns. For Compaction there is never a SARG. + * */ Reader.Options deltaEventOptions = eventOptions.clone() .searchArgument(null, null).range(0, Long.MAX_VALUE); for(Path delta: deltaDirectory) { @@ -951,11 +1070,40 @@ boolean isDeleteReader() { } ReaderKey key = new ReaderKey(); AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); + boolean isRawFormatDelta = AcidUtils.MetaDataFile.isRawFormat(delta, delta.getFileSystem(conf));//todo: this would ideally come from ParsedDelta object + if(isRawFormatDelta) { + assert !deltaDir.isDeleteDelta() : delta.toString(); + assert mergerOptions.isCompacting() : "during regular read anything which is not a delete_delta is treated like base: " + delta; + Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions, deltaDir.getMinTransaction(), delta); + //this will also handle copy_N files if any + ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options, rawCompactOptions, conf, validTxnList); + if (deltaPair.nextRecord() != null) { + readers.put(key, deltaPair); + } + continue; + } for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) { FileSystem fs = deltaFile.getFileSystem(conf); if(!fs.exists(deltaFile)) { + /** + * it's possible that the file for a specific {@link bucket} doesn't exist in any given + * delta since since no rows hashed to it (and not configured to create empty buckets) + */ continue; } + if(deltaDir.isDeleteDelta()) { + //maybe compaction or regular read or Delete event sorter + //in the later 2 cases we should do: + //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta + Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf)); + ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions, deltaDir.getStatementId()); + if (deltaPair.nextRecord() != null) { + readers.put(key, deltaPair); + } + continue; + } + //if here then we must be compacting + assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta; /* side files are only created by streaming ingest. If this is a compaction, we may * have an insert delta/ here with side files there because the original writer died.*/ long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile)); @@ -986,7 +1134,61 @@ boolean isDeleteReader() { columns = primary.getColumns(); } } - + static final class TransactionFolderPair { + final long syntheticTransactionId; + /** + * folder which determines the transaction id to use in synthetic ROW_IDs + */ + final Path folder; + TransactionFolderPair(long syntheticTransactionId, Path folder) { + this.syntheticTransactionId = syntheticTransactionId; + this.folder = folder; + } + } + static TransactionFolderPair findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath) { + Path parent = splitPath.getParent(); + if(rootPath.equals(parent)) { + return new TransactionFolderPair(0, parent);//the 'isOriginal' file is at the root of the partition (or table) thus it is + //from a pre-acid conversion write and belongs to primordial txnid:0. + }//note: "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid converted table + while(parent != null && !rootPath.equals(parent)) { + //https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Loadingfilesintotables says input dir cannot + //have subdirs (is this enforced?) - todo: then this doesn't have to walk + boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); + if(isBase || isDelta) { + if(isBase) { + return new TransactionFolderPair(AcidUtils.parseBase(parent), parent); + } + else { + AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX); + assert pd.getMinTransaction() == pd.getMaxTransaction() : + "This a delta with raw non acid schema, must be result of single write, no compaction: " + + splitPath; + return new TransactionFolderPair(pd.getMinTransaction(), parent); + } + } + parent = parent.getParent(); + } + if(parent == null) { + //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a + //base/ or delta/ - this should never happen + throw new IllegalStateException("Cannot determine transaction id for original file " + + splitPath + " in " + rootPath); + } + return new TransactionFolderPair(0, rootPath); + } + /** + * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which + * happens as a result of Load Data statement. Setting {@code rootPath} to base_x/ or delta_x_x + * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent + * {@link OriginalReaderPair} object to return the files in this dir + * in {@link AcidUtils.Directory#getOriginalFiles()} + * @return modified clone of {@code baseOptions} + */ + private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) { + return baseOptions.clone().transactionId(transactionId).rootPath(rootPath); + } /** * This determines the set of {@link ReaderPairAcid} to create for a given delta/. * For unbucketed tables {@code bucket} can be thought of as a write tranche. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 315cc1d3d1..8af38b26ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -196,7 +196,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { fields.add(new OrcStruct.Field("row", rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } - + /** + * @param path - partition root + */ OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index bcde4fc82f..4d75ce56cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.impl.AcidStats; @@ -156,7 +155,7 @@ public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Report this.vectorizedRowBatchBase = baseReader.createValue(); } - private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter, + private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter, VectorizedRowBatchCtx rowBatchCtx) throws IOException { this.rbCtx = rowBatchCtx; final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); @@ -165,12 +164,10 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Repor // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is // enabled for an ACID case and the file format is ORC. - boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate() - || !(inputSplit instanceof OrcSplit); + boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate(); if (isReadNotAllowed) { OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } - final OrcSplit orcSplit = (OrcSplit) inputSplit; reporter.setStatus(orcSplit.toString()); readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf)); @@ -226,9 +223,11 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Repor private static final class OffsetAndBucketProperty { private final long rowIdOffset; private final int bucketProperty; - private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) { + private final long syntheticTxnId; + private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) { this.rowIdOffset = rowIdOffset; this.bucketProperty = bucketProperty; + this.syntheticTxnId = syntheticTxnId; } } /** @@ -245,12 +244,15 @@ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) { private OffsetAndBucketProperty computeOffsetAndBucket( OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException { if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) { - return new OffsetAndBucketProperty(0,0); + //todo: should bucketProperty == -1 here? just to be safe + return new OffsetAndBucketProperty(0,0, -1); } + OrcRawRecordMerger.TransactionFolderPair syntheticTxnInfo = + OrcRawRecordMerger.findTransactionIDForSynthetcRowIDs(split.getPath(), split.getRootDir()); long rowIdOffset = 0; int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId)); - AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf, + AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, validTxnList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { AcidOutputFormat.Options bucketOptions = @@ -266,7 +268,7 @@ private OffsetAndBucketProperty computeOffsetAndBucket( OrcFile.readerOptions(conf)); rowIdOffset += reader.getNumberOfRows(); } - return new OffsetAndBucketProperty(rowIdOffset, bucketProperty); + return new OffsetAndBucketProperty(rowIdOffset, bucketProperty, syntheticTxnInfo.syntheticTransactionId); } /** * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables. @@ -316,6 +318,7 @@ private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { if (orcSplit.isOriginal()) { root = orcSplit.getRootDir(); } else { + //todo: why not just use getRootDir()? root = path.getParent().getParent(); assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() + " path.p.p=" + root; @@ -394,6 +397,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols; if(isOriginal) { + //TODO: why is this not checking if needSyntheticRowIds()? I guess because they will not be consumed upstream??? /* * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order * to see if any deletes apply @@ -409,8 +413,8 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti */ recordIdColumnVector.fields[0].noNulls = true; recordIdColumnVector.fields[0].isRepeating = true; - //all "original" is considered written by txnid:0 which committed - ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0; + //todo all "original" is considered written by txnid:0 which committed - comment is wrong + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; /** * This is {@link RecordIdentifier#getBucketProperty()} * Also see {@link BucketCodec} @@ -433,6 +437,12 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0]; innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + //these are insert events so (original txn == current) txn for all rows + innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0]; + } + if(syntheticProps.syntheticTxnId > 0) { + findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, + vectorizedRowBatchBase.size, selectedBitSet); } } else { @@ -441,6 +451,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti /** * All "original" data belongs to txnid:0 and is always valid/committed for every reader * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal + * todo: Note: this is no longer true with HIVE-17361 and IOW */ } @@ -477,7 +488,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti // vectorized code path is not being used in cases of update/delete, when the metadata columns // would be expected to be passed up the operator pipeline. This is because // currently the update/delete specifically disable vectorized code paths. - // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() + // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() TODO: this is clearly wrong see TestTxnNoBuckets.testToAcidConversionMultiBucket() StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; // Transfer columnVector objects from base batch to outgoing batch. System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index f7388a444d..e5d2c5a922 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -27,12 +27,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; @@ -98,6 +96,7 @@ this.length = fileSplit.getLength(); options.range(offset, length); options.include(OrcInputFormat.genIncludedColumns(schema, conf)); + //todo: above we assert that it's non-acid, so we really want to say non-acid schema OrcInputFormat.setSearchArgument(options, types, conf, true); this.reader = file.rowsOptions(options); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3e9fff195f..6c997da898 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; @@ -1703,18 +1704,20 @@ public void loadPartition(Path loadPath, String tableName, * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid - * true if this is an ACID operation + * @param isAcidIUDoperation + * true if this is an ACID operation Insert/Update/Delete operation * @param hasFollowingStatsTask * true if there is a following task which updates the stats, so, this method need not update. * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId) + boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); + assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); try { // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); @@ -1766,7 +1769,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); } - assert !isAcid; + assert !isAcidIUDoperation; if (areEventsForDmlNeeded(tbl, oldPart)) { newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } @@ -1790,16 +1793,22 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par filter = (loadFileType == LoadFileType.REPLACE_ALL) ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath); } - if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) { + //todo: why is "&& !isAcidIUDoperation" needed here? + if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { + //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new + // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles); } } @@ -1889,6 +1898,38 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } } + /** + * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or + * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add + * this path component. + * @param txnId - id of current transaction (in which this operation is running) + * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()} + * @return appropriately modified path + */ + private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException { + switch (loadFileType) { + case REPLACE_ALL: + destPath = new Path(destPath, AcidUtils.baseDir(txnId)); + break; + case KEEP_EXISTING: + destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + break; + case OVERWRITE_EXISTING: + //should not happen here - this is for replication + default: + throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType); + } + try { + FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf()); + if(!FileUtils.mkdir(fs, destPath, conf)) { + LOG.warn(destPath + " already exists?!?!"); + } + AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true); + } catch (IOException e) { + throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e); + } + return destPath; + } private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; @@ -2123,7 +2164,6 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param partSpec * @param loadFileType * @param numDP number of dynamic partitions - * @param listBucketingEnabled * @param isAcid true if this is an ACID operation * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) @@ -2271,14 +2311,16 @@ public Void call() throws Exception { * if list bucketing enabled * @param hasFollowingStatsTask * if there is any following stats task - * @param isAcid true if this is an ACID based write + * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long txnId, int stmtId, boolean isMmTable) throws HiveException { - + boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, + Long txnId, int stmtId) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); + assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); + boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); HiveConf sessionConf = SessionState.getSessionConf(); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList()); @@ -2296,24 +2338,31 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. - Path tblPath = tbl.getPath(), destPath = tblPath; + Path tblPath = tbl.getPath(); + Path destPath = tblPath; PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; if (isMmTable) { + assert !isAcidIUDoperation; // We will load into MM directory, and delete from the parent if needed. destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); filter = loadFileType == LoadFileType.REPLACE_ALL ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath + " (replace = " + loadFileType + ")"); - if (loadFileType == LoadFileType.REPLACE_ALL) { + if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) { + //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 + //todo: should probably do the same for MM IOW boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tblPath, loadPath, destPath, tblPath, sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); @@ -2356,7 +2405,6 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } - /** * Creates a partition. * diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cd75130d7c..a1b6cda3e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -391,7 +391,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadTableDesc loadTableWork = new LoadTableDesc(destPath, Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); Task loadTableTask = TaskFactory.get(mv, x.getConf()); @@ -400,6 +399,10 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return loadTableTask; } + /** + * todo: this is odd: transactions are opened for all statements. what is this supposed to check? + */ + @Deprecated private static boolean isAcid(Long txnId) { return (txnId != null) && (txnId != 0); } @@ -490,7 +493,6 @@ private static boolean isAcid(Long txnId) { partSpec.getPartSpec(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 238fbd6057..cc956da575 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -136,7 +136,7 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti } private List applyConstraintsAndGetFiles(URI fromURI, Tree ast, - boolean isLocal) throws SemanticException { + boolean isLocal, Table table) throws SemanticException { FileStatus[] srcs = null; @@ -159,6 +159,14 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, "source contains directory: " + oneSrc.getPath().toString())); } + if(AcidUtils.isFullAcidTable(table)) { + if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) { + //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply + //copied to a table so only allow non-acid files for now + throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME, + oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName()); + } + } } } catch (IOException e) { // Has to use full name to make sure it does not conflict with @@ -230,11 +238,8 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } } - if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) { - throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName()); - } // make sure the arguments make sense - List files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); + List files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle); // for managed tables, make sure the file formats match if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType()) @@ -277,17 +282,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } Long txnId = null; - int stmtId = 0; - Table tbl = ts.tableHandle; - if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) { + int stmtId = -1; + if (AcidUtils.isAcidTable(ts.tableHandle)) { txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement(); } LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); if (preservePartitionSpecs){ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 1fa7b40ada..4683c9c7a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -47,9 +47,22 @@ private Map partitionSpec; // NOTE: this partitionSpec has to be ordered map public enum LoadFileType { - REPLACE_ALL, // Remove all existing data before copy/move - KEEP_EXISTING, // If any file exist while copy, then just duplicate the file - OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file + /** + * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event. + * Remove all existing data before copy/move + */ + REPLACE_ALL, + /** + * This corresponds to INSERT INTO and LOAD DATA. + * If any file exist while copy, then just duplicate the file + */ + KEEP_EXISTING, + /** + * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite + * the file instead of making a duplicate copy. + * If any file exist while copy, then just overwrite the file + */ + OVERWRITE_EXISTING } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); @@ -215,14 +228,10 @@ public long getTxnId() { return currentTransactionId == null ? 0 : currentTransactionId; } - public void setTxnId(Long txnId) { - this.currentTransactionId = txnId; - } - public int getStmtId() { return stmtId; } - + //todo: should this not be passed in the c'tor? public void setStmtId(int stmtId) { this.stmtId = stmtId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7d4d3795d3..a804527527 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -577,11 +576,16 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) + && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format - FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter + : AcidUtils.bucketFileFilter); for(FileStatus f : files) { // For each file, figure out which bucket it is. - Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + Matcher matcher = isRawFormat ? + AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()) + : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); } } else { @@ -612,8 +616,12 @@ public String toString() { private void addFileToMap(Matcher matcher, Path file, boolean sawBase, Map splitToBucketMap) { if (!matcher.find()) { - LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " + - file.toString() + " Matcher=" + matcher.toString()); + String msg = "Found a non-bucket file that we thought matched the bucket pattern! " + + file.toString() + " Matcher=" + matcher.toString(); + LOG.error(msg); + //following matcher.group() would fail anyway and we don't want to skip files since that + //may be a data loss scenario + throw new IllegalArgumentException(msg); } int bucketNum = Integer.parseInt(matcher.group()); BucketTracker bt = splitToBucketMap.get(bucketNum); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 149a9adfd8..f455040aa5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -526,7 +526,8 @@ public void testMergeType2SCD01() throws Exception { String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " + "when matched then update set cur=0 " + "when not matched then insert values(s.key,s.data,1)"; - + //to allow cross join from 'teeCurMatch' + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); runStatementOnDriver(stmt); int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; List r = runStatementOnDriver("select * from target order by key,data,cur"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b877253210..d1f6d01cd8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -81,7 +81,7 @@ ).getPath().replaceAll("\\\\", "/"); protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - protected static int BUCKET_COUNT = 2; + static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); @@ -121,12 +121,11 @@ public void setUp() throws Exception { setUpWithTableProperties("'transactional'='true'"); } - protected void setUpWithTableProperties(String tableProperties) throws Exception { + void setUpWithTableProperties(String tableProperties) throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -410,7 +409,7 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { expectedException.expect(RuntimeException.class); expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); + runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')"); } /** * Test the query correctness and directory layout for ACID table conversion diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java new file mode 100644 index 0000000000..1745d03a39 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -0,0 +1,375 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +/** + * ToDo: + * 1.1. Document assumptions - what is isOriginal etc + * 1.2 rename ORIGINAL_BASE, etc + * 1,22 - clean up AcidUtils.MetaDataFile - enums + * 6. LoadSemanticAnalyzer.applyConstraintsAndGetFiles() verifies that there are no subdirs. But not filenames - add it for acid? + * Currently there are lots of examples (in the tests at least) where Load Data loads a file with arbitrary name + * The thing to do is probably to rename them on Import and just name 00000_0, 00001_0 etc in alphabetical - no one should be relying on file names under acid tabls - compactor will change them anyway + * 6.1 should it verify that schema matches? Say someone adds bucket_000 that is already acid? + * + * 6.2 Clean up utilities like runStatementOnDriver() etc + * 6.3 add TestTxnNoBucketsVectorized? - oddly it already is - make parametrized + * 7.0 - do updates actually vectorize? {@link org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader#next(NullWritable, VectorizedRowBatch)} comments are confusing + * yes, they do! + * + * 50. postpone SMB stuff until stuff is stabilized etc + */ +public class TestTxnLoadData extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + /** + * Note {@link org.apache.hadoop.hive.ql.metadata.Hive#isSubDir(Path, Path, FileSystem, FileSystem, boolean)} - why did I need this? + */ + @Test + public void loadData() throws Exception { + loadData(false); + } + @Test + public void loadDataVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + loadData(true); + } + @Test + public void loadDataUpdate() throws Exception { + loadDataUpdate(false); + } + @Test + public void loadDataUpdateVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + loadDataUpdate(true); + } + private void loadDataUpdate(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //"load data local inpath" doesn't delete source files so clean it here + runStatementOnDriver("truncate table Tstage"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][]{ + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + runStatementOnDriver("update T set b = 17 where a = 1"); + String[][] expected2 = new String[][]{ + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"} + }; + checkResult(expected2, testQuery, isVectorized, "update"); + + runStatementOnDriver("insert into T values(2,2)"); + runStatementOnDriver("delete from T where a = 3"); + //test minor compaction + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected3 = new String[][] { + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"}, + {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"} + }; + checkResult(expected3, testQuery, isVectorized, "delete compact minor"); + + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T"); + String[][] expected4 = new String[][]{ + {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"}, + {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}}; + checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite"); + + //load same data again (additive) + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows + runStatementOnDriver("delete from T where a = 3");//matches 2 rows + runStatementOnDriver("insert into T values(2,2)"); + String[][] expected5 = new String[][]{ + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, + {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"} + }; + checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update"); + + //test major compaction + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected6 = new String[][]{ + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"}, + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"}, + {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"} + }; + checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); + } + private void loadData(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("insert into T values(0,2),(0,4)"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + //"load data local inpath" doesn't delete source files so clean it here + runStatementOnDriver("truncate table Tstage"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + //normal insert + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"}, + //Load Data + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + + //test minor compaction + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected1 = new String[][] { + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"} + }; + checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)"); + + //test major compaction + runStatementOnDriver("insert into T values(2,2)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected2 = new String[][] { + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"} + }; + checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); + + //create more staging data and test Load Data Overwrite + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); + String[][] expected3 = new String[][] { + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}}; + checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite"); + + //one more major compaction + runStatementOnDriver("insert into T values(6,6)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected4 = new String[][] { + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"}, + {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}}; + checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)"); + } + /** + * Load Data [overwrite] in to an (un-)partitioned acid converted table + * OK, finally! Using export works in PTest + */ + @Test + public void loadDataNonAcid2AcidConversion() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + //per acid write to test nonAcid2acid conversion mixed with load data + runStatementOnDriver("insert into T values(0,2),(0,4)"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export + //export works at file level so if you have copy_N in the table dir, you'll have those in output + runStatementOnDriver("insert into Tstage values(2,2),(3,3)"); + runStatementOnDriver("insert into Tstage values(4,4),(5,5)"); + //create a file we'll import later + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + runStatementOnDriver("truncate table Tstage");//clean the staging table + + //now convert T to acid + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')"); + //and do a Load Data into the same table, which should now land in a delta/ + // (with 000000_0, 000000_0_copy_1, 000000_0_copy_2) + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + + String[][] expected = new String[][] { + //from pre-acid insert + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, + //from Load Data into acid converted table + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":2}\t2\t2", "t/delta_0000024_0000024_0000/000000_0_copy_1"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":3}\t3\t3", "t/delta_0000024_0000024_0000/000000_0_copy_1"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":4}\t4\t4", "t/delta_0000024_0000024_0000/000000_0_copy_2"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":5}\t5\t5", "t/delta_0000024_0000024_0000/000000_0_copy_2"}, + }; + checkExpected(rs, expected, "load data inpath"); + + //create more staging data with copy_N files and do LD+Overwrite + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("insert into Tstage values(8,8)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000029/000000_0"}, + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000029/000000_0"}, + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000029/000000_0_copy_1"} + }; + checkExpected(rs, expected2, "load data inpath overwrite"); + //create 1 more delta_x_x so that compactor has > dir file to compact + runStatementOnDriver("insert into T values(9,9)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + String[][] expected3 = new String[][] { + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000031/bucket_00000"}, + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000031/bucket_00000"}, + {"{\"transactionid\":29,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000031/bucket_00000"}, + {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000031/bucket_00000"} + + }; + checkExpected(rs, expected3, "load data inpath overwrite (major)"); + } + + /** + * Load Data [overwrite] in to a partitioned transactional table + */ + @Test + public void loadDataPartitioned() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + + runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + runStatementOnDriver("truncate table Tstage");//because 'local' inpath doesn't delete source files + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T partition(p=0)"); + + runStatementOnDriver("insert into Tstage values(1,2),(1,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' into table T partition(p=1)"); + + runStatementOnDriver("insert into Tstage values(2,2),(2,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/3'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/3/data' into table T partition(p=1)"); + + List rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + String[][] expected = new String[][] { + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"}, + {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}}; + checkExpected(rs, expected, "load data inpath partitioned"); + + + runStatementOnDriver("insert into Tstage values(5,2),(5,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/4'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}}; + rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + checkExpected(rs, expected2, "load data inpath partitioned overwrite"); + } + + /** + * By default you can't load into bucketed tables. Things will break badly in acid (data loss, etc) + * if loaded data is not bucketed properly. This test is to capture that this is still the default. + * If the default is changed, Load Data should probably do more validation to ensure data is + * properly distributed into files and files are named correctly. + */ + @Test + public void testValidations() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')"); + File createdFile= folder.newFile("myfile.txt"); + FileUtils.writeStringToFile(createdFile, "hello world"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("load data local inpath '" + getWarehouseDir() + "' into table T"); + Assert.assertTrue(cpr.getErrorMessage().contains("Load into bucketed tables are disabled")); + } + private void checkExpected(List rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG, true); + } + @Test + public void testMMOrcTable() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')"); + int[][] values = {{1,2},{3,4}}; + runStatementOnDriver("insert into T " + makeValuesClause(values)); + List rs = runStatementOnDriver("select a, b from T order by b"); + Assert.assertEquals("", stringifyValues(values), rs); + } + + /** + * We have to use a different query to check results for Vectorized tests because to get the + * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} + * which will currently make the query non-vectorizable. This means we can't check the file name + * for vectorized version of the test. + */ + private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ + List rs = runStatementOnDriver(query); + checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); + assertVectorized(isVectorized, query); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index f0d9ff2235..6fe05d6a23 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -54,7 +54,7 @@ String getTestDataDir() { @Before public void setUp() throws Exception { setUpInternal(); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);//todo: this should be perhaps be parametrized } /** @@ -259,18 +259,6 @@ public void testInsertToAcidWithUnionRemove() throws Exception { }; checkExpected(rs, expected, "Unexpected row count after ctas"); } - private void checkExpected(List rs, String[][] expected, String msg) { - LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); - for(String s : rs) { - LOG.warn(s); - } - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); - //verify data and layout - for(int i = 0; i < expected.length; i++) { - Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); - } - } /** * The idea here is to create a non acid table that was written by multiple writers, i.e. * unbucketed table that has 000000_0 & 000001_0, for example. @@ -358,7 +346,9 @@ logical bucket (tranche) Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984)); Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448)); + assertVectorized(true, "update T set b = 88 where b = 80"); runStatementOnDriver("update T set b = 88 where b = 80"); + assertVectorized(true, "delete from T where b = 8"); runStatementOnDriver("delete from T where b = 8"); String expected3[][] = { {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, @@ -369,7 +359,7 @@ logical bucket (tranche) {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000023_0000023_0000/bucket_00000"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000024_0000024_0000/bucket_00000"}, }; rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); checkExpected(rs, expected3,"after converting to acid (no compaction with updates)"); @@ -381,15 +371,15 @@ logical bucket (tranche) /*Compaction preserves location of rows wrt buckets/tranches (for now)*/ String expected4[][] = { - {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000024/bucket_00002"}, - {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000024/bucket_00002"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000024/bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000024/bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000026/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000026/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000026/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000026/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000026/bucket_00000"}, }; checkExpected(rs, expected4,"after major compact"); } @@ -630,15 +620,8 @@ public void testNonAcidToAcidVectorzied() throws Exception { //vectorized because there is INPUT__FILE__NAME assertVectorized(false, query); } - private void assertVectorized(boolean vectorized, String query) throws Exception { - List rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); - for(String line : rs) { - if(line != null && line.contains("Execution mode: vectorized")) { - Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); - return; - } - } - Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); + private void checkExpected(List rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG, true); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 8737369c39..38a2d3ca84 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; +import org.slf4j.Logger; import java.io.File; import java.util.ArrayList; @@ -74,7 +75,6 @@ void setUpInternal() throws Exception { hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -151,6 +151,16 @@ CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Except } throw new RuntimeException("Didn't get expected failure!"); } + void assertVectorized(boolean vectorized, String query) throws Exception { + List rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); + for(String line : rs) { + if(line != null && line.contains("Execution mode: vectorized")) { + Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); + return; + } + } + Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); + } /** * Will assert that actual files match expected. * @param expectedFiles - suffixes of expected Paths. Must be the same length @@ -176,4 +186,18 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex } Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles); } + void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { + LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + if(checkFileName) { + Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index d5ab07939c..afccf644f5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, false, null, 0, false); + true, false, false, false, null, 0); i++; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index ccd7d8ef96..2cc40a4ee4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -836,16 +836,22 @@ public void testBIStrategySplitBlockBoundary() throws Exception { public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000"); + AcidUtils.setTransactionalTableScan(conf, true); + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/1/part-00", 1000, new byte[1]), new MockFile("mock:/a/1/part-01", 1000, new byte[1]), new MockFile("mock:/a/2/part-00", 1000, new byte[1]), new MockFile("mock:/a/2/part-01", 1000, new byte[1]), - new MockFile("mock:/a/3/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/4/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/5/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/5/delta_0_25/1", 1000, new byte[1]) + new MockFile("mock:/a/3/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/4/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/5/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/5/delta_0_25/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/6/delta_27_29/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/6/delete_delta_27_29/bucket_00001", 1000, new byte[1]) ); OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx(); @@ -891,20 +897,28 @@ public void testEtlCombinedStrategy() throws Exception { assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy); assertEquals(2, etlSs.files.size()); assertEquals(2, etlSs.dirs.size()); - // The fifth will not be combined because of delta files. + // The fifth could be combined again. ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx); + assertTrue(ss.isEmpty()); + assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy); +// assertNotSame(etlSs, ss); + assertEquals(4, etlSs.files.size()); + assertEquals(3, etlSs.dirs.size()); + + // The sixth will not be combined because of delete delta files. Is that desired? HIVE-18110 + ss = createOrCombineStrategies(context, fs, "mock:/a/6", combineCtx); assertEquals(1, ss.size()); assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy); assertNotSame(etlSs, ss); - assertEquals(2, etlSs.files.size()); - assertEquals(2, etlSs.dirs.size()); + assertEquals(4, etlSs.files.size()); + assertEquals(3, etlSs.dirs.size()); } public List> createOrCombineStrategies(OrcInputFormat.Context context, MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } @@ -918,7 +932,7 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } @@ -3586,10 +3600,13 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open to read data - split 1 => mock:/mocktable8/0_0 - // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001 - // call-3: split 2 - read delta_x_y/bucket_00001 - assertEquals(5, readOpsDelta); + // call-1: open(mock:/mocktable7/0_0) + // call-2: open(mock:/mocktable7/0_0) + // call-3: listLocatedFileStatuses(mock:/mocktable7) + // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) + // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) + // call-6: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) + assertEquals(6, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3663,8 +3680,9 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { } // call-1: open to read data - split 1 => mock:/mocktable8/0_0 // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001 - // call-3: split 2 - read delta_x_y/bucket_00001 - assertEquals(3, readOpsDelta); + // call-3: check _metadata_acid file (doesn't exist) + // call-4: split 2 - read delta_x_y/bucket_00001 + assertEquals(4, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 9628a40626..7813518ace 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -835,7 +835,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { assertEquals(null, merger.getMaxKey()); assertEquals(true, merger.next(id, event)); - assertEquals(OrcRecordUpdater.DELETE_OPERATION, + assertEquals(OrcRecordUpdater.DELETE_OPERATION,//minor comp, so we ignore 'base_0000100' files so all Deletes end up first since they all modify primordial rows OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id); assertNull(OrcRecordUpdater.getRow(event)); @@ -891,10 +891,10 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); merger = - new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + new OrcRawRecordMerger(conf, true, null, false, BUCKET, createMaximalTxnList(), new Reader.Options(), AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options() - .isCompacting(true).isMajorCompaction(true)); + .isCompacting(true).isMajorCompaction(true).baseDir(new Path(root, "base_0000100"))); assertEquals(null, merger.getMinKey()); assertEquals(null, merger.getMaxKey()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index b2ac687c75..95e34632b9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -48,7 +48,7 @@ import org.apache.orc.TypeDescription; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; + /** * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set * of delete delta files. The split is on an insert delta and there are multiple delete deltas @@ -186,7 +186,7 @@ public void setup() throws Exception { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); assertEquals(1, splitStrategies.size()); List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); diff --git ql/src/test/queries/clientnegative/load_data_into_acid.q ql/src/test/queries/clientnegative/load_data_into_acid.q index fba1496fc6..2ac5b561ae 100644 --- ql/src/test/queries/clientnegative/load_data_into_acid.q +++ ql/src/test/queries/clientnegative/load_data_into_acid.q @@ -1,7 +1,5 @@ -set hive.strict.checks.bucketing=false; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; create table acid_ivot( ctinyint TINYINT, @@ -15,7 +13,7 @@ create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true'); + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true'); LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; diff --git ql/src/test/results/clientnegative/load_data_into_acid.q.out ql/src/test/results/clientnegative/load_data_into_acid.q.out index cd829ba0a6..46b5cdd2c8 100644 --- ql/src/test/results/clientnegative/load_data_into_acid.q.out +++ ql/src/test/results/clientnegative/load_data_into_acid.q.out @@ -10,7 +10,7 @@ PREHOOK: query: create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@acid_ivot @@ -26,8 +26,8 @@ POSTHOOK: query: create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_ivot -FAILED: SemanticException [Error 10266]: LOAD DATA... statement is not supported on transactional table default@acid_ivot. +FAILED: SemanticException [Error 30023]: alltypesorc file name is not valid in Load Data into Acid table default.acid_ivot. Examples of valid names are: 00000_0, 00000_0_copy_1 diff --git ql/src/test/results/clientpositive/acid_table_stats.q.out ql/src/test/results/clientpositive/acid_table_stats.q.out index d0fbcac5b0..86e2bb4b52 100644 --- ql/src/test/results/clientpositive/acid_table_stats.q.out +++ ql/src/test/results/clientpositive/acid_table_stats.q.out @@ -38,6 +38,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/autoColumnStats_4.q.out ql/src/test/results/clientpositive/autoColumnStats_4.q.out index 2bc1789949..09c77be7e1 100644 --- ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -29,6 +29,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -198,6 +199,7 @@ Table Parameters: rawDataSize 0 totalSize 1798 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -241,6 +243,7 @@ Table Parameters: rawDataSize 0 totalSize 2909 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/mm_default.q.out ql/src/test/results/clientpositive/mm_default.q.out index ebbcb9da99..1345efdfb6 100644 --- ql/src/test/results/clientpositive/mm_default.q.out +++ ql/src/test/results/clientpositive/mm_default.q.out @@ -324,6 +324,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 25caf2929d..c0111569c4 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -128,7 +128,12 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); } if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) { - //only need to check conformance if alter table enabled aicd + if(!isTransactionalPropertiesPresent) { + normazlieTransactionalPropertyDefault(newTable); + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY; + } + //only need to check conformance if alter table enabled acid if (!conformToAcid(newTable)) { // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) { @@ -232,6 +237,9 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + if(transactionalProperties == null) { + normazlieTransactionalPropertyDefault(newTable); + } initializeTransactionalProperties(newTable); return; } @@ -241,6 +249,16 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } /** + * When a table is marked transactional=true but transactional_properties is not set then + * transactional_properties should take on the default value. Easier to make this explicit in + * table definition than keep checking everywhere if it's set or not. + */ + private void normazlieTransactionalPropertyDefault(Table table) { + table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + DEFAULT_TRANSACTIONAL_PROPERTY); + + } + /** * Check that InputFormatClass/OutputFormatClass should implement * AcidInputFormat/AcidOutputFormat */