diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index eb0ba7b..ecf846c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -26,7 +26,14 @@ import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.ContentSummary; @@ -162,13 +169,7 @@ private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) thr return true; } } - long remaining = threshold; - remaining -= data.getInputLength(pctx, remaining); - if (remaining < 0) { - LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode"); - return false; - } - return true; + return data.isDataLengthWitInThreshold(pctx, threshold); } // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS @@ -414,18 +415,15 @@ private ListSinkOperator completed(ParseContext pctx, FetchWork work) { return replaceFSwithLS(fileSink, work.getSerializationNullFormat()); } - private long getInputLength(ParseContext pctx, long remaining) throws Exception { + private boolean isDataLengthWitInThreshold(ParseContext pctx, long threshold) throws Exception { if (splitSample != null && splitSample.getTotalLength() != null) { - return splitSample.getTotalLength(); - } - if (splitSample != null) { - return splitSample.getTargetSize(calculateLength(pctx, splitSample.estimateSourceSize(remaining))); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + splitSample.getTotalLength() + " exceeded for pseudoMR mode"); + } + return (threshold - splitSample.getTotalLength()) > 0; } - return calculateLength(pctx, remaining); - } - private long calculateLength(ParseContext pctx, long remaining) throws Exception { - JobConf jobConf = new JobConf(pctx.getConf()); + final JobConf jobConf = new JobConf(pctx.getConf()); Utilities.setColumnNameList(jobConf, scanOp, true); Utilities.setColumnTypeList(jobConf, scanOp, true); HiveStorageHandler handler = table.getStorageHandler(); @@ -434,23 +432,64 @@ private long calculateLength(ParseContext pctx, long remaining) throws Exception TableDesc tableDesc = Utilities.getTableDesc(table); PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - return estimator.estimate(jobConf, scanOp, remaining).getTotalLength(); + long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength(); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; } if (table.isNonNative()) { - return 0; // nothing can be done + return true; // nothing can be done } if (!table.isPartitioned()) { - return getFileLength(jobConf, table.getPath(), table.getInputFormatClass()); + long len = getFileLength(jobConf, table.getPath(), table.getInputFormatClass()); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; } - long total = 0; - for (Partition partition : partsList.getNotDeniedPartns()) { - Path path = partition.getDataLocation(); - total += getFileLength(jobConf, path, partition.getInputFormatClass()); - if (total > remaining) { - break; + final AtomicLong total = new AtomicLong(0); + //TODO: use common thread pool later? + int threadCount = HiveConf.getIntVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS); + final ExecutorService pool = (threadCount > 0) ? + Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null; + try { + List futures = Lists.newLinkedList(); + for (final Partition partition : partsList.getNotDeniedPartns()) { + final Path path = partition.getDataLocation(); + if (pool != null) { + futures.add(pool.submit(new Callable() { + @Override + public Long call() throws Exception { + long len = getFileLength(jobConf, path, partition.getInputFormatClass()); + LOG.debug(path + ", length=" + len); + return total.addAndGet(len); + } + })); + } else { + total.addAndGet(getFileLength(jobConf, path, partition.getInputFormatClass())); + } + } + if (pool != null) { + for (Future future : futures) { + long totalLen = future.get(); + if ((threshold - totalLen) <= 0) { + // early exit, as getting file lengths can be expensive in object stores. + return false; + } + } + } + return (threshold - total.get()) >= 0; + } finally { + LOG.info("Data set size=" + total.get() + ", threshold=" + threshold); + if (pool != null) { + pool.shutdownNow(); } } - return total; } // from Utilities.getInputSummary()