diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index cb5fee4a48..2b1d3b304e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -19,16 +19,25 @@ package org.apache.hadoop.hive.ql.stats.fs; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -41,8 +50,8 @@ public class FSStatsAggregator implements StatsAggregator { private final Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); - private List>> statsList; - private Map> statsMap; + private Queue>> statsList; + private Map> statsMap = new HashMap<>(); private FileSystem fs; @Override @@ -51,34 +60,69 @@ public boolean connect(StatsCollectionContext scc) { assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; Path statsDir = new Path(statsDirs.get(0)); Utilities.FILE_OP_LOGGER.trace("About to read stats from {}", statsDir); - statsMap = new HashMap>(); + int poolSize = HiveConf.getIntVar(scc.getHiveConf(), HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT); + // In case thread count is set to 0, use single thread. + poolSize = Math.max(poolSize, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("stats-updater-thread-%d") + .build());; + + final List> futureList = new LinkedList<>(); try { fs = statsDir.getFileSystem(scc.getHiveConf()); - statsList = new ArrayList>>(); + statsList = new ConcurrentLinkedQueue<>(); FileStatus[] status = fs.listStatus(statsDir, new PathFilter() { @Override public boolean accept(Path file) { return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX); } }); - for (FileStatus file : status) { - Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath()); - Input in = new Input(fs.open(file.getPath())); - Kryo kryo = SerializationUtilities.borrowKryo(); - try { - statsMap = kryo.readObject(in, statsMap.getClass()); - } finally { - SerializationUtilities.releaseKryo(kryo); - } - Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap); - statsList.add(statsMap); - in.close(); + for (final FileStatus file : status) { + futureList.add(pool.submit(new Callable() { + @Override public Void call() { + Input in = null; + Kryo kryo = null; + try { + in = new Input(fs.open(file.getPath())); + kryo = SerializationUtilities.borrowKryo(); + Map> stats = kryo.readObject(in, statsMap.getClass()); + statsList.add(stats); + Utilities.FILE_OP_LOGGER.info("Read stats : " + stats); + } catch(Exception e) { + Utilities.FILE_OP_LOGGER.error("Reading stats : " + file.getPath(), e); + } finally { + SerializationUtilities.releaseKryo(kryo); + if (in != null) { + in.close(); + } + } + return null; + } + })); + } + for(Future future : futureList) { + future.get(); } return true; - } catch (IOException e) { + } catch (IOException | ExecutionException e) { Utilities.FILE_OP_LOGGER.error("Failed to read stats from filesystem ", e); + cancelRunningTasks(futureList); return false; + } catch (InterruptedException e) { + cancelRunningTasks(futureList); + //reset interrupt state + Thread.currentThread().interrupt(); + } finally { + pool.shutdownNow(); + } + return false; + } + + private void cancelRunningTasks(List> tasks) { + for(Future task: tasks) { + task.cancel(true); } }