diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8094d28f21..239a64d602 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2781,7 +2781,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether Hive supports transactional stats (accurate stats for transactional tables)"), HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration", 120, "Enable dir cache for ACID tables specified in minutes." - + "0 indicates cache is disabled. "), + + "0 indicates cache is used as read-only and no additional info would be " + + "populated. -1 means cache is disabled"), HIVE_TXN_READONLY_ENABLED("hive.txn.readonly.enabled", false, "Enables read-only transaction classification and related optimizations"), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index bf332bc0b8..deac8f628a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -2954,6 +2954,14 @@ private static boolean isLockableTable(Table t) { } LockComponentBuilder compBuilder = new LockComponentBuilder(); Table t = null; + /** + * For any insert/updates set dir cache to read-only mode, where it wouldn't + * add any new entry to cache. + * When updates are executed, delta folders are created only at the end of the statement + * and at the time of acquiring locks, there would not be any delta folders. This can cause wrong data to be reported + * when "insert" followed by "update" statements are executed. In such cases, use the cache as read only mode. + */ + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION, 0); switch (output.getType()) { case DATABASE: compBuilder.setDbName(output.getDatabase().getName()); @@ -3184,6 +3192,12 @@ private static void initDirCache(int durationInMts) { dirCacheInited.set(true); } + private static void printDirCacheEntries() { + if (dirCache != null) { + LOG.debug("Cache entries: {}", Arrays.toString(dirCache.asMap().keySet().toArray())); + } + } + /** * Tries to get directory details from cache. For now, cache is valid only * when base directory is available and no deltas are present. This should @@ -3208,7 +3222,7 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, int dirCacheDuration = HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION); - if (dirCacheDuration <= 0) { + if (dirCacheDuration < 0) { LOG.debug("dirCache is not enabled"); return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots); @@ -3239,9 +3253,8 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, // double check writeIds if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) { if (LOG.isDebugEnabled()) { - LOG.info("writeIdList: {} from cache: {} is not matching " - + "for key: {}", writeIdList.writeToString(), - value.getTxnString(), key); + LOG.debug("writeIdList: {} from cache: {} is not matching for key: {}", + writeIdList.writeToString(), value.getTxnString(), key); } recompute = true; } @@ -3256,12 +3269,19 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null && value.dirInfo.getCurrentDirectories().isEmpty()) { - populateBaseFiles(dirInfo, useFileIds, fileSystem); - dirCache.put(key, value); + if (dirCacheDuration > 0) { + populateBaseFiles(dirInfo, useFileIds, fileSystem); + dirCache.put(key, value); + } else { + LOG.info("Not populating cache for {}, as duration is set to 0", key); + } } } else { LOG.info("Got {} from cache, cache size: {}", key, dirCache.size()); } + if (LOG.isDebugEnabled()) { + printDirCacheEntries(); + } return value.getDirInfo(); } diff --git ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q new file mode 100644 index 0000000000..1ad09c1276 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q @@ -0,0 +1,30 @@ +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +-- Check with update +drop table if exists sequential_update; +create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc; +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''); +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1); +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + + +-- Check with compaction +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''); +update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1); +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +alter table sequential_update compact 'major'; +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + + +-- Check with deletes +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, ''); +delete from sequential_update where seq=2; +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +alter table sequential_update compact 'major'; +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; diff --git ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out new file mode 100644 index 0000000000..4606b1d581 --- /dev/null +++ ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out @@ -0,0 +1,137 @@ +PREHOOK: query: drop table if exists sequential_update +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists sequential_update +POSTHOOK: type: DROPTABLE +PREHOOK: query: create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sequential_update +POSTHOOK: query: create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sequential_update +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000001 +PREHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000002_0000002_0000 +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000004_0000004_0000 +PREHOOK: query: alter table sequential_update compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: query: alter table sequential_update compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000004_0000004_0000 +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: delete from sequential_update where seq=2 +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: delete from sequential_update where seq=2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000005 +PREHOOK: query: alter table sequential_update compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: query: alter table sequential_update compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000005