diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index cb5fee4a48..1be04b83eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -21,14 +21,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -51,7 +59,15 @@ public boolean connect(StatsCollectionContext scc) { assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; Path statsDir = new Path(statsDirs.get(0)); Utilities.FILE_OP_LOGGER.trace("About to read stats from {}", statsDir); - statsMap = new HashMap>(); + int poolSize = scc.getHiveConf().getInt(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1); + // In case thread count is set to 0, use single thread. + poolSize = Math.max(poolSize, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("stats-updater-thread-%d") + .build());; + + final List> futureList = new LinkedList<>(); try { fs = statsDir.getFileSystem(scc.getHiveConf()); @@ -62,23 +78,50 @@ public boolean accept(Path file) { return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX); } }); - for (FileStatus file : status) { - Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath()); - Input in = new Input(fs.open(file.getPath())); - Kryo kryo = SerializationUtilities.borrowKryo(); - try { - statsMap = kryo.readObject(in, statsMap.getClass()); - } finally { - SerializationUtilities.releaseKryo(kryo); - } - Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap); - statsList.add(statsMap); - in.close(); + for (final FileStatus file : status) { + futureList.add(pool.submit(new Callable() { + @Override public Void call() { + Input in = null; + Kryo kryo = null; + try { + in = new Input(fs.open(file.getPath())); + kryo = SerializationUtilities.borrowKryo(); + Map> stats = kryo.readObject(in, statsMap.getClass()); + statsList.add(stats); + Utilities.FILE_OP_LOGGER.info("Read stats : " + stats); + } catch(Exception e) { + Utilities.FILE_OP_LOGGER.error("Reading stats : " + file.getPath(), e); + } finally { + SerializationUtilities.releaseKryo(kryo); + if (in != null) { + in.close(); + } + } + return null; + } + })); + } + for(Future future : futureList) { + future.get(); } return true; - } catch (IOException e) { + } catch (IOException | ExecutionException e) { Utilities.FILE_OP_LOGGER.error("Failed to read stats from filesystem ", e); + cancelRunningTasks(futureList); return false; + } catch (InterruptedException e) { + cancelRunningTasks(futureList); + //reset interrupt state + Thread.currentThread().interrupt(); + } finally { + pool.shutdownNow(); + } + return false; + } + + private void cancelRunningTasks(List> tasks) { + for(Future task: tasks) { + task.cancel(true); } }