diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 561f3e3..2f9ec00 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import javax.jdo.PersistenceManager; import javax.jdo.Query; @@ -1188,7 +1189,8 @@ public AggrStats aggrColStatsForPartitions(String dbName, String tableName, LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval"); return new AggrStats(new ArrayList(), 0); // Nothing to aggregate } - long partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames); + LOG.debug("partNames size={}", partNames.size()); + AtomicLong partsFound = new AtomicLong(0); List colStatsList; // Try to read from the cache first if (isAggregateStatsCacheEnabled) { @@ -1199,7 +1201,7 @@ public AggrStats aggrColStatsForPartitions(String dbName, String tableName, int partitionsRequested = partNames.size(); if (partitionsRequested > maxPartsPerCacheNode) { colStatsList = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, - partsFound, useDensityFunctionForNDVEstimation); + useDensityFunctionForNDVEstimation, partsFound); } else { colStatsList = new ArrayList(); // Bloom filter for the new node that we will eventually add to the cache @@ -1215,25 +1217,26 @@ public AggrStats aggrColStatsForPartitions(String dbName, String tableName, // Read aggregated stats for one column colStatsAggrFromDB = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNamesForDB, - partsFound, useDensityFunctionForNDVEstimation); + useDensityFunctionForNDVEstimation, partsFound); if (!colStatsAggrFromDB.isEmpty()) { ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0); colStatsList.add(colStatsAggr); // Update the cache to add this new aggregate node - aggrStatsCache.add(dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter); + aggrStatsCache.add(dbName, tableName, colName, partsFound.get() + , colStatsAggr, bloomFilter); } } } } } else { colStatsList = - columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound, - useDensityFunctionForNDVEstimation); + columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, + useDensityFunctionForNDVEstimation, partsFound); } LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation - + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = " - + Arrays.toString(colStatsList.toArray())); - return new AggrStats(colStatsList, partsFound); + + "\npartsFound = " + partsFound.get() + ", partNames=" + partNames.size() + + "\nColumnStatisticsObj = " + Arrays.toString(colStatsList.toArray())); + return new AggrStats(colStatsList, partsFound.get()); } private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, float fpp, @@ -1290,24 +1293,40 @@ private long partsFoundForPartitions(final String dbName, final String tableName } private List columnStatisticsObjForPartitions(final String dbName, - final String tableName, final List partNames, List colNames, long partsFound, - final boolean useDensityFunctionForNDVEstimation) throws MetaException { - final boolean areAllPartsFound = (partsFound == partNames.size()); + final String tableName, final List partNames, final List colNames, + final boolean useDensityFunctionForNDVEstimation, final AtomicLong partsFound) + throws MetaException { return runBatched(colNames, new Batchable() { public List run(final List inputColNames) throws MetaException { - return runBatched(partNames, new Batchable() { + partsFound.set(0); + List result = runBatched(partNames, new Batchable() { public List run(List inputPartNames) throws MetaException { return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames, - inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation); + inputColNames, useDensityFunctionForNDVEstimation, partsFound); } }); + + LOG.debug("cols={}, tableName={}, partsFound={}, partNames.size={}" + , colNames, tableName, partsFound.get(), partNames.size()); + + if (partsFound.get() != partNames.size()) { + // extrapolate stats + LOG.info("Extrapolating stats for table={}, cols={}; partsFound={}, partNames size={}", + tableName, colNames, partsFound.get(), partNames.size()); + result = runBatched(partNames, new Batchable() { + public List run(List inputPartNames) throws MetaException { + return columnStatisticsObjForPartitionsBatchWithEstimate(dbName, tableName, inputPartNames, + inputColNames, useDensityFunctionForNDVEstimation); + } + }); + } + return result; } }); } - /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List columnStatisticsObjForPartitionsBatch(String dbName, - String tableName, List partNames, List colNames, boolean areAllPartsFound, + private List columnStatisticsObjForPartitionsBatchWithEstimate(String + dbName, String tableName, List partNames, List colNames, boolean useDensityFunctionForNDVEstimation) throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. @@ -1341,237 +1360,271 @@ private long partsFoundForPartitions(final String dbName, final String tableName boolean doTrace = LOG.isDebugEnabled(); Object qResult = null; ForwardQueryResult fqr = null; - // Check if the status of all the columns of all the partitions exists - // Extrapolation is not needed. - if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + List colStats = new ArrayList(colNames.size()); + // Extrapolation is needed for some columns. + // In this case, at least a column status for a partition is missing. + // We need to extrapolate this partition based on the other partitions + queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") " + + " from \"PART_COL_STATS\"" + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), + queryText); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + List noExtraColumnNames = new ArrayList(); + Map extraColumnNameTypeParts = new HashMap(); + List list = ensureList(qResult); + for (Object[] row : list) { + String colName = (String) row[0]; + String colType = (String) row[1]; + // Extrapolation is not needed for this column if + // count(\"PARTITION_NAME\")==partNames.size() + // Or, extrapolation is not possible for this column if + // count(\"PARTITION_NAME\")<2 + Long count = extractSqlLong(row[2]); + if (count == partNames.size() || count < 2) { + noExtraColumnNames.add(colName); + } else { + extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) }); + } + Deadline.checkTimeout(); + } + query.closeAll(); + // Extrapolation is not needed for columns noExtraColumnNames + if (noExtraColumnNames.size() != 0) { + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; start = doTrace ? System.nanoTime() : 0; query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), - queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, noExtraColumnNames), queryText); if (qResult == null) { query.closeAll(); return Lists.newArrayList(); } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - List list = ensureList(qResult); - List colStats = new ArrayList(list.size()); + list = ensureList(qResult); for (Object[] row : list) { colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); Deadline.checkTimeout(); } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); query.closeAll(); - return colStats; - } else { - // Extrapolation is needed for some columns. - // In this case, at least a column status for a partition is missing. - // We need to extrapolate this partition based on the other partitions - List colStats = new ArrayList(colNames.size()); - queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") " - + " from \"PART_COL_STATS\"" + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " - + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + } + // Extrapolation is needed for extraColumnNames. + // give a sequence number for all the partitions + if (extraColumnNameTypeParts.size() != 0) { + Map indexMap = new HashMap(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + // get sum for all columns to reduce the number of queries + Map> sumMap = new HashMap>(); + queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" + + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + + ") group by \"COLUMN_NAME\""; start = doTrace ? System.nanoTime() : 0; query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), - queryText); - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); + List extraColumnNames = new ArrayList(); + extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, extraColumnNames), queryText); if (qResult == null) { query.closeAll(); return Lists.newArrayList(); } - List noExtraColumnNames = new ArrayList(); - Map extraColumnNameTypeParts = new HashMap(); - List list = ensureList(qResult); + list = ensureList(qResult); + // see the indexes for colstats in IExtrapolatePartStatus + Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 }; for (Object[] row : list) { - String colName = (String) row[0]; - String colType = (String) row[1]; - // Extrapolation is not needed for this column if - // count(\"PARTITION_NAME\")==partNames.size() - // Or, extrapolation is not possible for this column if - // count(\"PARTITION_NAME\")<2 - Long count = extractSqlLong(row[2]); - if (count == partNames.size() || count < 2) { - noExtraColumnNames.add(colName); - } else { - extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) }); + Map indexToObject = new HashMap(); + for (int ind = 1; ind < row.length; ind++) { + indexToObject.put(sumIndex[ind - 1], row[ind]); } + // row[0] is the column name + sumMap.put((String) row[0], indexToObject); Deadline.checkTimeout(); } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); query.closeAll(); - // Extrapolation is not needed for columns noExtraColumnNames - if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" - + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(dbName, tableName, partNames, noExtraColumnNames), queryText); - if (qResult == null) { - query.closeAll(); - return Lists.newArrayList(); - } - list = ensureList(qResult); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - } - // Extrapolation is needed for extraColumnNames. - // give a sequence number for all the partitions - if (extraColumnNameTypeParts.size() != 0) { - Map indexMap = new HashMap(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - // get sum for all columns to reduce the number of queries - Map> sumMap = new HashMap>(); - queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " - + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) - + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) - + ") group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - List extraColumnNames = new ArrayList(); - extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - qResult = executeWithArray(query, - prepareParams(dbName, tableName, partNames, extraColumnNames), queryText); - if (qResult == null) { - query.closeAll(); - return Lists.newArrayList(); + for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { + Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; + String colName = entry.getKey(); + String colType = entry.getValue()[0]; + Long sumVal = Long.parseLong(entry.getValue()[1]); + // fill in colname + row[0] = colName; + // fill in coltype + row[1] = colType; + // use linear extrapolation. more complicated one can be added in the + // future. + IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); + // fill in colstatus + Integer[] index = null; + boolean decimal = false; + if (colType.toLowerCase().startsWith("decimal")) { + index = IExtrapolatePartStatus.indexMaps.get("decimal"); + decimal = true; + } else { + index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); } - list = ensureList(qResult); - // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 }; - for (Object[] row : list) { - Map indexToObject = new HashMap(); - for (int ind = 1; ind < row.length; ind++) { - indexToObject.put(sumIndex[ind - 1], row[ind]); - } - // row[0] is the column name - sumMap.put((String) row[0], indexToObject); - Deadline.checkTimeout(); + // if the colType is not the known type, long, double, etc, then get + // all index. + if (index == null) { + index = IExtrapolatePartStatus.indexMaps.get("default"); } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - query.closeAll(); - for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { - Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; - String colName = entry.getKey(); - String colType = entry.getValue()[0]; - Long sumVal = Long.parseLong(entry.getValue()[1]); - // fill in colname - row[0] = colName; - // fill in coltype - row[1] = colType; - // use linear extrapolation. more complicated one can be added in the - // future. - IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); - // fill in colstatus - Integer[] index = null; - boolean decimal = false; - if (colType.toLowerCase().startsWith("decimal")) { - index = IExtrapolatePartStatus.indexMaps.get("decimal"); - decimal = true; - } else { - index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); - } - // if the colType is not the known type, long, double, etc, then get - // all index. - if (index == null) { - index = IExtrapolatePartStatus.indexMaps.get("default"); - } - for (int colStatIndex : index) { - String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; - // if the aggregation type is sum, we do a scale-up - if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { - Object o = sumMap.get(colName).get(colStatIndex); - if (o == null) { - row[2 + colStatIndex] = null; - } else { - Long val = extractSqlLong(o); - row[2 + colStatIndex] = (Long) (val / sumVal * (partNames.size())); - } - } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min - || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { - // if the aggregation type is min/max, we extrapolate from the - // left/right borders - if (!decimal) { - queryText = "select \"" + colStatName - + "\",\"PARTITION_NAME\" from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " order by \"" + colStatName + "\""; - } else { - queryText = "select \"" + colStatName - + "\",\"PARTITION_NAME\" from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" - + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" - + " order by cast(\"" + colStatName + "\" as decimal)"; - } - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); - if (qResult == null) { - query.closeAll(); - return Lists.newArrayList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] min = (Object[]) (fqr.get(0)); - Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); + for (int colStatIndex : index) { + String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; + // if the aggregation type is sum, we do a scale-up + if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { + Object o = sumMap.get(colName).get(colStatIndex); + if (o == null) { + row[2 + colStatIndex] = null; + } else { + Long val = extractSqlLong(o); + row[2 + colStatIndex] = (Long) (val / sumVal * (partNames.size())); + } + } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min + || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { + // if the aggregation type is min/max, we extrapolate from the + // left/right borders + if (!decimal) { + queryText = "select \"" + colStatName + + "\",\"PARTITION_NAME\" from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " order by \"" + colStatName + "\""; + } else { + queryText = "select \"" + colStatName + + "\",\"PARTITION_NAME\" from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " order by cast(\"" + colStatName + "\" as decimal)"; + } + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); + if (qResult == null) { query.closeAll(); - if (min[0] == null || max[0] == null) { - row[2 + colStatIndex] = null; - } else { - row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, - indexMap); - } + return Lists.newArrayList(); + } + fqr = (ForwardQueryResult) qResult; + Object[] min = (Object[]) (fqr.get(0)); + Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); + if (min[0] == null || max[0] == null) { + row[2 + colStatIndex] = null; } else { - // if the aggregation type is avg, we use the average on the existing ones. - queryText = "select " - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" - + " from \"PART_COL_STATS\"" + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" - + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); - if (qResult == null) { - query.closeAll(); - return Lists.newArrayList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] avg = (Object[]) (fqr.get(0)); - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); + row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, + indexMap); + } + } else { + // if the aggregation type is avg, we use the average on the existing ones. + queryText = "select " + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" + + " from \"PART_COL_STATS\"" + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); + if (qResult == null) { query.closeAll(); + return Lists.newArrayList(); } + fqr = (ForwardQueryResult) qResult; + Object[] avg = (Object[]) (fqr.get(0)); + // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", + // "AVG_DECIMAL" + row[2 + colStatIndex] = avg[colStatIndex - 12]; + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); } - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); - Deadline.checkTimeout(); } + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); + Deadline.checkTimeout(); } - return colStats; } + return colStats; + } + + /** Should be called with the list short enough to not trip up Oracle/etc. */ + private List columnStatisticsObjForPartitionsBatch(String dbName, + String tableName, List partNames, List colNames, + boolean useDensityFunctionForNDVEstimation, AtomicLong expectedParts) + throws MetaException { + // only mechanical data retrieval should remain here. + String commonPrefix = "select COUNT(\"COLUMN_NAME\"), \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " + + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " + // The following data is used to compute a partitioned table's NDV based + // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be + // accurately derived from partition NDVs, because the domain of column value two partitions + // can overlap. If there is no overlap then global NDV is just the sum + // of partition NDVs (UpperBound). But if there is some overlay then + // global NDV can be anywhere between sum of partition NDVs (no overlap) + // and same as one of the partition NDV (domain of column value in all other + // partitions is subset of the domain value in one of the partition) + // (LowerBound).But under uniform distribution, we can roughly estimate the global + // NDV by leveraging the min/max values. + // And, we also guarantee that the estimation makes sense by comparing it to the + // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") + // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\")" + " from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "; + String queryText = null; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + ForwardQueryResult fqr = null; + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), + queryText); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + List colStats = new ArrayList(list.size()); + for (Object[] row : list) { + expectedParts.getAndAdd((Long) row[0]); + colStats.add(prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation)); + Deadline.checkTimeout(); + } + query.closeAll(); + return colStats; } private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 73ca9bf..4ebbb13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -412,20 +412,32 @@ private int getDistinctCount(Set partitions, String partColName) { public List getColStat(List projIndxLst, boolean allowNullColumnForMissingStats) { List colStatsBldr = Lists.newArrayList(); - + Set projIndxSet = new HashSet(projIndxLst); if (projIndxLst != null) { - updateColStats(new HashSet(projIndxLst), allowNullColumnForMissingStats); for (Integer i : projIndxLst) { - colStatsBldr.add(hiveColStatsMap.get(i)); + if (hiveColStatsMap.get(i) != null) { + colStatsBldr.add(hiveColStatsMap.get(i)); + projIndxSet.remove(i); + } + } + if (!projIndxSet.isEmpty()) { + updateColStats(projIndxSet, allowNullColumnForMissingStats); + for (Integer i : projIndxSet) { + colStatsBldr.add(hiveColStatsMap.get(i)); + } } } else { List pILst = new ArrayList(); for (Integer i = 0; i < noOfNonVirtualCols; i++) { - pILst.add(i); + if (hiveColStatsMap.get(i) == null) { + pILst.add(i); + } } - updateColStats(new HashSet(pILst), allowNullColumnForMissingStats); - for (Integer pi : pILst) { - colStatsBldr.add(hiveColStatsMap.get(pi)); + if (!pILst.isEmpty()) { + updateColStats(new HashSet(pILst), allowNullColumnForMissingStats); + for (Integer pi : pILst) { + colStatsBldr.add(hiveColStatsMap.get(pi)); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 3a179a3..37baaf6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -749,8 +749,8 @@ private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String al return false; } VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); - if (LOG.isInfoEnabled()) { - LOG.info("Vectorizer path: " + path + ", " + vectorPartDesc.toString() + + if (LOG.isDebugEnabled()) { + LOG.debug("Vectorizer path: " + path + ", " + vectorPartDesc.toString() + ", aliases " + aliases); }