diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 43c412d..fa6293a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -21,19 +21,18 @@ import static org.apache.commons.lang.StringUtils.join; import static org.apache.commons.lang.StringUtils.repeat; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.SQLException; -import java.sql.Statement; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Date; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; import javax.jdo.PersistenceManager; import javax.jdo.Query; @@ -47,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Decimal; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; @@ -65,9 +65,8 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; -import org.apache.hadoop.hive.metastore.parser.FilterLexer; import org.apache.hadoop.hive.serde.serdeConstants; -import org.datanucleus.store.schema.SchemaTool; +import org.datanucleus.store.rdbms.query.ForwardQueryResult; import com.google.common.collect.Lists; @@ -97,6 +96,35 @@ * Whether direct SQL can be used with the current datastore backing {@link #pm}. */ private final boolean isCompatibleDatastore; + + private String[] colStatNames = new String[] { "LONG_LOW_VALUE", + "LONG_HIGH_VALUE", "DOUBLE_LOW_VALUE", "DOUBLE_HIGH_VALUE", + "BIG_DECIMAL_LOW_VALUE", "BIG_DECIMAL_HIGH_VALUE", "NUM_NULLS", + "NUM_DISTINCTS", "AVG_COL_LEN", "MAX_COL_LEN", "NUM_TRUES", + "NUM_FALSES" }; + + private enum ColStatType { + Long, + Double, + Decimal + } + + private ColStatType[] colStatTypes = new ColStatType[] { ColStatType.Long, + ColStatType.Long, ColStatType.Double, ColStatType.Double, + ColStatType.Decimal, ColStatType.Decimal, ColStatType.Long, + ColStatType.Long, ColStatType.Double, ColStatType.Long, ColStatType.Long, + ColStatType.Long }; + + private enum FuncType { + Min, + Max, + Sum + } + + private FuncType[] funcTypes = new FuncType[] { FuncType.Min, FuncType.Max, + FuncType.Min, FuncType.Max, FuncType.Min, FuncType.Max, + FuncType.Sum, FuncType.Max, FuncType.Max, FuncType.Max, + FuncType.Sum, FuncType.Sum }; public MetaStoreDirectSql(PersistenceManager pm) { this.pm = pm; @@ -901,33 +929,303 @@ public ColumnStatistics getTableStats( return result; } - public List aggrColStatsForPartitions(String dbName, String tableName, - List partNames, List colNames) throws MetaException { - String qText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(\"BIG_DECIMAL_LOW_VALUE\"), max(\"BIG_DECIMAL_HIGH_VALUE\"), sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\") from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (" - + makeParams(colNames.size()) + ") AND \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ") group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + public List aggrColStatsForPartitions(String dbName, + String tableName, List partNames, List colNames) + throws MetaException { + // Check if the status of all the columns of all the partitions exists + String qText = "select count(*) from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (" + + makeParams(colNames.size()) + ") AND \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ") "; boolean doTrace = LOG.isDebugEnabled(); long start = doTrace ? System.nanoTime() : 0; Query query = pm.newQuery("javax.jdo.query.SQL", qText); - Object qResult = query.executeWithArray(prepareParams(dbName, tableName, partNames, colNames)); + Object qResult = query.executeWithArray(prepareParams(dbName, tableName, + partNames, colNames)); if (qResult == null) { query.closeAll(); return Lists.newArrayList(); } - List list = ensureList(qResult); - List colStats = new ArrayList(list.size()); - for (Object[] row : list) { - colStats.add(prepareCSObj(row,0)); - } + ForwardQueryResult fqr = (ForwardQueryResult) qResult; + Integer total = (Integer) fqr.get(0); long end = doTrace ? System.nanoTime() : 0; timingTrace(doTrace, qText, start, end); query.closeAll(); - return colStats; + + if (total == colNames.size() * (partNames.size() - 1)) { + // Extrapolation is not needed. + qText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(\"BIG_DECIMAL_LOW_VALUE\"), max(\"BIG_DECIMAL_HIGH_VALUE\"), sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\") from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (" + + makeParams(colNames.size()) + ") AND \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + + ") group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", qText); + qResult = query.executeWithArray(prepareParams(dbName, tableName, + partNames, colNames)); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + List list = ensureList(qResult); + List colStats = new ArrayList( + list.size()); + for (Object[] row : list) { + colStats.add(prepareCSObj(row, 0)); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, qText, start, end); + query.closeAll(); + return colStats; + } else { + // Extrapolation is needed for some columns. + // In this case, at least a column status for a partition is missing. + // We need to extrapolate this partition based on the other partitions + List colStats = new ArrayList( + colNames.size()); + for (String colname : colNames) { + qText = "select PARTITION_NAME from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (" + + makeParams(1) + ") AND \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ") "; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", qText); + qResult = query.executeWithArray(prepareParams(dbName, tableName, + partNames, Arrays.asList(new String[] { colname }))); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + fqr = (ForwardQueryResult) qResult; + List existPartNames = new ArrayList(); + existPartNames.addAll(fqr); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, qText, start, end); + query.closeAll(); + + if (existPartNames.size() == (partNames.size() - 1) + || existPartNames.size() < 2) { + // Extrapolation is not needed for this column if + // existPartNames.size()==partNames.size() + // Or, extrapolation is not possible for this column if + // existPartNames.size()<2 + qText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(\"BIG_DECIMAL_LOW_VALUE\"), max(\"BIG_DECIMAL_HIGH_VALUE\"), sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\") from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (" + + makeParams(1) + ") AND \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + + ") group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", qText); + qResult = query.executeWithArray(prepareParams(dbName, tableName, + partNames, Arrays.asList(new String[] { colname }))); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + List list = ensureList(qResult); + for (Object[] row : list) { + colStats.add(prepareCSObj(row, 0)); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, qText, start, end); + query.closeAll(); + } else { + // Extrapolation is needed for this column. + + // give a sequence number for all the partitions + Map indexMap = new HashMap(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + + // fill in colname + Object[] row = new Object[colStatNames.length + 2]; + row[0] = colname; + + // fill in coltype + qText = "select COLUMN_TYPE from PART_COL_STATS " + + " where COLUMN_NAME = \'" + colname + "\' and DB_NAME = \'" + + dbName + "\' and TABLE_NAME = \'" + tableName + "\'"; + query = pm.newQuery("javax.jdo.query.SQL", qText); + start = doTrace ? System.nanoTime() : 0; + qResult = query.execute(); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + fqr = (ForwardQueryResult) qResult; + row[1] = (String) fqr.get(0); + + // fill in colstatus + for (int colStatIndex = 0; colStatIndex < colStatNames.length; colStatIndex++) { + String colStatName = colStatNames[colStatIndex]; + // if the aggregation type is sum, we do a scale-up + if (funcTypes[colStatIndex] == FuncType.Sum) { + qText = "select " + "sum(" + colStatName + + ") from PART_COL_STATS" + " where COLUMN_NAME = \'" + + colname + "\' and DB_NAME = \'" + dbName + + "\' and TABLE_NAME = \'" + tableName + "\'"; + query = pm.newQuery("javax.jdo.query.SQL", qText); + start = doTrace ? System.nanoTime() : 0; + qResult = query.execute(); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, qText, start, end); + fqr = (ForwardQueryResult) qResult; + if (fqr == null || fqr.isEmpty()) { + row[2 + colStatIndex] = null; + } else { + Long val = (Long) (fqr.get(0)); + row[2 + colStatIndex] = (Long) (val / existPartNames.size() * (partNames + .size() - 1)); + } + query.closeAll(); + } else { + // if the aggregation type is min/max, we extrapolate from the + // left/right borders + qText = "select " + colStatName + + ",PARTITION_NAME from PART_COL_STATS" + + " where COLUMN_NAME = \'" + colname + "\' and DB_NAME = \'" + + dbName + "\' and TABLE_NAME = \'" + tableName + "\'" + + " order by \'" + colStatName + "\'"; + query = pm.newQuery("javax.jdo.query.SQL", qText); + start = doTrace ? System.nanoTime() : 0; + qResult = query.execute(); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + fqr = (ForwardQueryResult) qResult; + Object[] min = (Object[]) (fqr.get(0)); + Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, qText, start, end); + query.closeAll(); + if (min[0] == null || max[0] == null) { + row[2 + colStatIndex] = null; + } else { + row[2 + colStatIndex] = extrapolate(min, max, colStatIndex, + indexMap); + } + } + } + colStats.add(prepareCSObj(row, 0)); + } + } + return colStats; + } + } + + private static Decimal createThriftDecimal(String s) { + BigDecimal d = new BigDecimal(s); + return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), + (short) d.scale()); + } + + private Object extrapolate(Object[] min, Object[] max, int colStatIndex, + Map indexMap) { + int rightBorderInd = indexMap.size() - 2; + int minInd = indexMap.get((String) min[1]); + int maxInd = indexMap.get((String) max[1]); + if (minInd == maxInd) { + return min[0]; + } + if (funcTypes[colStatIndex] == FuncType.Max) { + if (minInd < maxInd) { + // right border is the max + if (colStatTypes[colStatIndex] == ColStatType.Long) { + return (Long) ((Long) min[0] + (((Long) max[0] - (Long) min[0]) + * (rightBorderInd - minInd) / (maxInd - minInd))); + } else if (colStatTypes[colStatIndex] == ColStatType.Double) { + return (Double) ((Double) min[0] + (((Double) max[0] - (Double) min[0]) + * (rightBorderInd - minInd) / (maxInd - minInd))); + } else { + Decimal dmax = (Decimal) max[0]; + BigDecimal bdmax = new BigDecimal(dmax.toString()); + double doublemax = bdmax.doubleValue(); + Decimal dmin = (Decimal) min[0]; + BigDecimal bdmin = new BigDecimal(dmin.toString()); + double doublemin = bdmin.doubleValue(); + double ret = doublemin + (doublemax - doublemin) + * (rightBorderInd - minInd) / (maxInd - minInd); + return createThriftDecimal(String.valueOf(ret)); + } + } else { + // left border is the max + if (colStatTypes[colStatIndex] == ColStatType.Long) { + return (Long) ((Long) min[0] + ((Long) max[0] - (Long) min[0]) + * minInd / (minInd - maxInd)); + } else if (colStatTypes[colStatIndex] == ColStatType.Double) { + return (Double) ((Double) min[0] + ((Double) max[0] - (Double) min[0]) + * minInd / (maxInd - minInd)); + } else { + Decimal dmax = (Decimal) max[0]; + BigDecimal bdmax = new BigDecimal(dmax.toString()); + double doublemax = bdmax.doubleValue(); + Decimal dmin = (Decimal) min[0]; + BigDecimal bdmin = new BigDecimal(dmin.toString()); + double doublemin = bdmin.doubleValue(); + double ret = doublemin + (doublemax - doublemin) * minInd + / (maxInd - minInd); + return createThriftDecimal(String.valueOf(ret)); + } + } + } else { + if (minInd < maxInd) { + // left border is the min + if (colStatTypes[colStatIndex] == ColStatType.Long) { + Long ret = (Long) max[0] - ((Long) max[0] - (Long) min[0]) * maxInd + / (maxInd - minInd); + return ret; + } else if (colStatTypes[colStatIndex] == ColStatType.Double) { + Double ret = (Double) max[0] - ((Double) max[0] - (Double) min[0]) + * maxInd / (maxInd - minInd); + return ret; + } else { + Decimal dmax = (Decimal) max[0]; + BigDecimal bdmax = new BigDecimal(dmax.toString()); + double doublemax = bdmax.doubleValue(); + Decimal dmin = (Decimal) min[0]; + BigDecimal bdmin = new BigDecimal(dmin.toString()); + double doublemin = bdmin.doubleValue(); + double ret = doublemax - (doublemax - doublemin) * maxInd + / (maxInd - minInd); + return createThriftDecimal(String.valueOf(ret)); + + } + } else { + // right border is the min + if (colStatTypes[colStatIndex] == ColStatType.Long) { + Long ret = (Long) max[0] - ((Long) max[0] - (Long) min[0]) + * (rightBorderInd - maxInd) / (minInd - maxInd); + return ret; + } else if (colStatTypes[colStatIndex] == ColStatType.Double) { + Double ret = (Double) max[0] - ((Double) max[0] - (Double) min[0]) + * (rightBorderInd - maxInd) / (minInd - maxInd); + return ret; + } else { + Decimal dmax = (Decimal) max[0]; + BigDecimal bdmax = new BigDecimal(dmax.toString()); + double doublemax = bdmax.doubleValue(); + Decimal dmin = (Decimal) min[0]; + BigDecimal bdmin = new BigDecimal(dmin.toString()); + double doublemin = bdmin.doubleValue(); + double ret = doublemax - (doublemax - doublemin) + * (rightBorderInd - maxInd) / (minInd - maxInd); + return createThriftDecimal(String.valueOf(ret)); + } + } + } } private ColumnStatisticsObj prepareCSObj (Object[] row, int i) {