diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index cf27e92baf..f7aad663e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,22 +160,20 @@ private MapJoinTableContainer load(FileSystem fs, Path path, MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); if (!SparkUtilities.isDedicatedCluster(hconf)) { - return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : - mapJoinTableSerde.load(fs, path, hconf); + return loadMapJoinTableContainer(fs, path, mapJoinTableSerde); } - MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); - if (mapJoinTable == null) { - synchronized (path.toString().intern()) { - mapJoinTable = SmallTableCache.get(path); - if (mapJoinTable == null) { - mapJoinTable = useFastContainer ? - mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : - mapJoinTableSerde.load(fs, path, hconf); - SmallTableCache.cache(path, mapJoinTable); - } - } + + try { + return SmallTableCache.get(path, () -> loadMapJoinTableContainer(fs, path, mapJoinTableSerde)); + } catch (ExecutionException e) { + throw new HiveException(e); } - return mapJoinTable; + } + + private MapJoinTableContainer loadMapJoinTableContainer(FileSystem fs, Path path, + MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { + return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : + mapJoinTableSerde.load(fs, path, hconf); } private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java index 3293100af9..eef21d66c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -29,8 +32,9 @@ public class SmallTableCache { private static final Logger LOG = LoggerFactory.getLogger(SmallTableCache.class.getName()); - private static final ConcurrentHashMap - tableContainerMap = new ConcurrentHashMap(); + private static final Cache + TABLE_CONTAINER_CACHE = CacheBuilder.newBuilder().softValues().build(); + private static volatile String queryId; /** @@ -40,13 +44,13 @@ public static void initialize(Configuration conf) { String currentQueryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); if (!currentQueryId.equals(queryId)) { - if (!tableContainerMap.isEmpty()) { - synchronized (tableContainerMap) { - if (!currentQueryId.equals(queryId) && !tableContainerMap.isEmpty()) { - for (MapJoinTableContainer tableContainer: tableContainerMap.values()) { + if (TABLE_CONTAINER_CACHE.size() != 0) { + synchronized (TABLE_CONTAINER_CACHE) { + if (!currentQueryId.equals(queryId) && TABLE_CONTAINER_CACHE.size() != 0) { + for (MapJoinTableContainer tableContainer: TABLE_CONTAINER_CACHE.asMap().values()) { tableContainer.clear(); } - tableContainerMap.clear(); + TABLE_CONTAINER_CACHE.invalidateAll(); if (LOG.isDebugEnabled()) { LOG.debug("Cleaned up small table cache for query " + queryId); } @@ -57,17 +61,8 @@ public static void initialize(Configuration conf) { } } - public static void cache(Path path, MapJoinTableContainer tableContainer) { - if (tableContainerMap.putIfAbsent(path, tableContainer) == null && LOG.isDebugEnabled()) { - LOG.debug("Cached small table file " + path + " for query " + queryId); - } - } - - public static MapJoinTableContainer get(Path path) { - MapJoinTableContainer tableContainer = tableContainerMap.get(path); - if (tableContainer != null && LOG.isDebugEnabled()) { - LOG.debug("Loaded small table file " + path + " from cache for query " + queryId); - } - return tableContainer; + public static MapJoinTableContainer get(Path path, Callable valueLoader) + throws ExecutionException { + return TABLE_CONTAINER_CACHE.get(path, valueLoader); } }