diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index e60dbaef8e..cf27e92baf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,19 +159,22 @@ private MapJoinTableContainer load(FileSystem fs, Path path, MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); if (!SparkUtilities.isDedicatedCluster(hconf)) { - return loadMapJoinTableContainer(fs, path, mapJoinTableSerde); + return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : + mapJoinTableSerde.load(fs, path, hconf); } - - try { - return SmallTableCache.get(path, () -> loadMapJoinTableContainer(fs, path, mapJoinTableSerde)); - } catch (ExecutionException e) { - throw new HiveException(e); + MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + synchronized (path.toString().intern()) { + mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + mapJoinTable = useFastContainer ? + mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : + mapJoinTableSerde.load(fs, path, hconf); + SmallTableCache.cache(path, mapJoinTable); + } + } } - } - - private MapJoinTableContainer loadMapJoinTableContainer(FileSystem fs, Path path, MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { - return useFastContainer ? mapJoinTableSerde.loadFastContainer(desc, fs, path, hconf) : - mapJoinTableSerde.load(fs, path, hconf); + return mapJoinTable; } private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java index 6577d10275..3293100af9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentHashMap; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -32,9 +29,8 @@ public class SmallTableCache { private static final Logger LOG = LoggerFactory.getLogger(SmallTableCache.class.getName()); - private static final Cache - tableContainerCache = CacheBuilder.newBuilder().softValues().build(); - + private static final ConcurrentHashMap + tableContainerMap = new ConcurrentHashMap(); private static volatile String queryId; /** @@ -44,13 +40,13 @@ public static void initialize(Configuration conf) { String currentQueryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); if (!currentQueryId.equals(queryId)) { - if (tableContainerCache.size() != 0) { - synchronized (tableContainerCache) { - if (!currentQueryId.equals(queryId) && tableContainerCache.size() != 0) { - for (MapJoinTableContainer tableContainer: tableContainerCache.asMap().values()) { + if (!tableContainerMap.isEmpty()) { + synchronized (tableContainerMap) { + if (!currentQueryId.equals(queryId) && !tableContainerMap.isEmpty()) { + for (MapJoinTableContainer tableContainer: tableContainerMap.values()) { tableContainer.clear(); } - tableContainerCache.invalidateAll(); + tableContainerMap.clear(); if (LOG.isDebugEnabled()) { LOG.debug("Cleaned up small table cache for query " + queryId); } @@ -61,8 +57,17 @@ public static void initialize(Configuration conf) { } } - public static MapJoinTableContainer get(Path path, Callable valueLoader) - throws ExecutionException { - return tableContainerCache.get(path, valueLoader); + public static void cache(Path path, MapJoinTableContainer tableContainer) { + if (tableContainerMap.putIfAbsent(path, tableContainer) == null && LOG.isDebugEnabled()) { + LOG.debug("Cached small table file " + path + " for query " + queryId); + } + } + + public static MapJoinTableContainer get(Path path) { + MapJoinTableContainer tableContainer = tableContainerMap.get(path); + if (tableContainer != null && LOG.isDebugEnabled()) { + LOG.debug("Loaded small table file " + path + " from cache for query " + queryId); + } + return tableContainer; } }