diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index bf332bc0b8..a344f6b6d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -3046,6 +3046,9 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi case UPDATE: case DELETE: + // read-your-writes in case of update/delete. + // Set dircache duration to 0 to purge old entry. + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION, 0); assert t != null; if (AcidUtils.isTransactionalTable(t) && sharedWrite) { compBuilder.setSharedWrite(); @@ -3184,6 +3187,30 @@ private static void initDirCache(int durationInMts) { dirCacheInited.set(true); } + private static void printDirCacheEntries() { + if (dirCache != null) { + LOG.debug("Cache entries: {}", Arrays.toString(dirCache.asMap().keySet().toArray())); + } + } + + private static void purgeOldEntryIfNeeded(String key) { + /** + * During updates/delete, where read-your-writes exists, cache duration can be set to 0 to purge + * the entry. + * + * When updates are executed, delta folders are created + * only at the end of the statement and at the time of acquiring locks, + * there would not be any delta folders. This can cause wrong data to be reported + * when "insert overwrite" followed by "update" statements are executed. In such cases, + * "HIVE_TXN_ACID_DIR_CACHE_DURATION" is explicitly set to 0 for update/delete statements + * and we purge those entries from cache here. + */ + if (dirCacheInited.get() && dirCache.getIfPresent(key) != null) { + LOG.info("Purge old value for: {}, cache size: {}", key, dirCache.size()); + dirCache.invalidate(key); + } + } + /** * Tries to get directory details from cache. For now, cache is valid only * when base directory is available and no deltas are present. This should @@ -3208,6 +3235,11 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, int dirCacheDuration = HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION); + //dbName + tableName + dir + String key = writeIdList.getTableName() + "_" + candidateDirectory.toString(); + + purgeOldEntryIfNeeded(key); + if (dirCacheDuration <= 0) { LOG.debug("dirCache is not enabled"); return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList, @@ -3221,8 +3253,6 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, * In case of changes, cache would get invalidated based on * open/aborted list */ - //dbName + tableName + dir - String key = writeIdList.getTableName() + "_" + candidateDirectory.toString(); DirInfoValue value = dirCache.getIfPresent(key); // in case of open/aborted txns, recompute dirInfo @@ -3239,9 +3269,8 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, // double check writeIds if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) { if (LOG.isDebugEnabled()) { - LOG.info("writeIdList: {} from cache: {} is not matching " - + "for key: {}", writeIdList.writeToString(), - value.getTxnString(), key); + LOG.debug("writeIdList: {} from cache: {} is not matching for key: {}", + writeIdList.writeToString(), value.getTxnString(), key); } recompute = true; } @@ -3262,6 +3291,9 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, } else { LOG.info("Got {} from cache, cache size: {}", key, dirCache.size()); } + if (LOG.isDebugEnabled()) { + printDirCacheEntries(); + } return value.getDirInfo(); } diff --git ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q new file mode 100644 index 0000000000..1ad09c1276 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q @@ -0,0 +1,30 @@ +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +-- Check with update +drop table if exists sequential_update; +create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc; +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''); +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1); +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + + +-- Check with compaction +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''); +update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1); +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +alter table sequential_update compact 'major'; +select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + + +-- Check with deletes +insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, ''); +delete from sequential_update where seq=2; +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; + +alter table sequential_update compact 'major'; +select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update; diff --git ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out new file mode 100644 index 0000000000..4606b1d581 --- /dev/null +++ ql/src/test/results/clientpositive/tez/acid_insert_overwrite_update.q.out @@ -0,0 +1,137 @@ +PREHOOK: query: drop table if exists sequential_update +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists sequential_update +POSTHOOK: type: DROPTABLE +PREHOOK: query: create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sequential_update +POSTHOOK: query: create transactional table sequential_update(ctime timestamp, seq bigint, mtime timestamp, prev_writeid bigint, prev_dirname string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sequential_update +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000001 +PREHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000002_0000002_0000 +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: update sequential_update set mtime = current_timestamp, seq=1, prev_writeid = ROW__ID.writeId, prev_dirname = regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000004_0000004_0000 +PREHOOK: query: alter table sequential_update compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: query: alter table sequential_update compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +PREHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==1, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY delta_0000004_0000004_0000 +PREHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, '') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sequential_update +POSTHOOK: query: insert overwrite table sequential_update values(current_timestamp, 0, current_timestamp, 0, ''), (current_timestamp, 2, current_timestamp, 2, '') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sequential_update +POSTHOOK: Lineage: sequential_update.ctime SCRIPT [] +POSTHOOK: Lineage: sequential_update.mtime SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_dirname SCRIPT [] +POSTHOOK: Lineage: sequential_update.prev_writeid SCRIPT [] +POSTHOOK: Lineage: sequential_update.seq SCRIPT [] +PREHOOK: query: delete from sequential_update where seq=2 +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: default@sequential_update +POSTHOOK: query: delete from sequential_update where seq=2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: default@sequential_update +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000005 +PREHOOK: query: alter table sequential_update compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: query: alter table sequential_update compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +PREHOOK: type: QUERY +PREHOOK: Input: default@sequential_update +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'), regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sequential_update +POSTHOOK: Output: hdfs://### HDFS PATH ### +LOOKS OKAY base_0000005