diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index cb5fee4a48..595735e070 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -21,14 +21,21 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -42,7 +49,6 @@ public class FSStatsAggregator implements StatsAggregator { private final Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); private List>> statsList; - private Map> statsMap; private FileSystem fs; @Override @@ -51,34 +57,63 @@ public boolean connect(StatsCollectionContext scc) { assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs; Path statsDir = new Path(statsDirs.get(0)); Utilities.FILE_OP_LOGGER.trace("About to read stats from {}", statsDir); - statsMap = new HashMap>(); + int poolSize = HiveConf.getIntVar(scc.getHiveConf(), HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT); + // In case thread count is set to 0, use single thread. + poolSize = Math.max(poolSize, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("stats-updater-thread-%d") + .build());; + + final List>>> futureList = new LinkedList<>(); try { fs = statsDir.getFileSystem(scc.getHiveConf()); - statsList = new ArrayList>>(); + statsList = new ArrayList<>(); FileStatus[] status = fs.listStatus(statsDir, new PathFilter() { @Override public boolean accept(Path file) { return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX); } }); - for (FileStatus file : status) { - Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath()); - Input in = new Input(fs.open(file.getPath())); - Kryo kryo = SerializationUtilities.borrowKryo(); - try { - statsMap = kryo.readObject(in, statsMap.getClass()); - } finally { - SerializationUtilities.releaseKryo(kryo); + Map> statsMap = new HashMap<>(); + for (final FileStatus file : status) { + futureList.add(pool.submit(() -> { + Kryo kryo = null; + try (Input in = new Input(fs.open(file.getPath()))) { + kryo = SerializationUtilities.borrowKryo(); + Map> stats = kryo.readObject(in, statsMap.getClass()); + Utilities.FILE_OP_LOGGER.trace("Read stats {}", stats); + return stats; + } finally { + SerializationUtilities.releaseKryo(kryo); + } + })); + } + for(Future>> future : futureList) { + Map> stats = future.get(); + if (stats != null) { + statsList.add(stats); } - Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap); - statsList.add(statsMap); - in.close(); } return true; - } catch (IOException e) { + } catch (IOException | ExecutionException e) { Utilities.FILE_OP_LOGGER.error("Failed to read stats from filesystem ", e); + cancelRunningTasks(futureList); return false; + } catch (InterruptedException e) { + cancelRunningTasks(futureList); + //reset interrupt state + Thread.currentThread().interrupt(); + } finally { + pool.shutdownNow(); + } + return false; + } + + private void cancelRunningTasks(List>>> tasks) { + for(Future>> task: tasks) { + task.cancel(true); } }