diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index eded92e1c1..8cd4e52c79 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -75,6 +74,7 @@ import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; /** @@ -970,12 +970,6 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - return objectStore.getColStatsForTablePartitions(dbName, tableName); - } - - @Override public String getMetastoreDbUuid() throws MetaException { throw new MetaException("getMetastoreDbUuid is not implemented"); } @@ -1086,4 +1080,11 @@ public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerNa String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath); } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 0aa1d4e16a..e4440fb55b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; @@ -502,18 +503,18 @@ private static void verifyPartitionsPublished(HiveMetaStoreClient client, return vals4; } - private static Partition makePartitionObject(String dbName, String tblName, - List ptnVals, Table tbl, String ptnLocationSuffix) throws MetaException { - Partition part4 = new Partition(); - part4.setDbName(dbName); - part4.setTableName(tblName); - part4.setValues(ptnVals); - part4.setParameters(new HashMap()); - part4.setSd(tbl.getSd().deepCopy()); - part4.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); - part4.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); - MetaStoreUtils.updatePartitionStatsFast(part4, warehouse, null); - return part4; + private static Partition makePartitionObject(String dbName, String tblName, List ptnVals, + Table tbl, String ptnLocationSuffix) throws MetaException { + Partition part = new Partition(); + part.setDbName(dbName); + part.setTableName(tblName); + part.setValues(ptnVals); + part.setParameters(new HashMap()); + part.setSd(tbl.getSd().deepCopy()); + part.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); + part.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); + MetaStoreUtils.updatePartitionStatsFast(part, warehouse, null); + return part; } public void testListPartitions() throws Throwable { @@ -643,7 +644,7 @@ public void testAlterTableCascade() throws Throwable { //get partition, this partition should not have the newly added column since cascade option //was false partition = client.getPartition(dbName, tblName, pvalues); - Assert.assertEquals("Unexpected number of cols", 3, partition.getSd().getCols().size()); + Assert.assertEquals("Unexpected number of cols", 3, partition.getSd().getCols().size()); } @@ -847,6 +848,7 @@ public void testAlterPartition() throws Throwable { Table tbl = new Table(); tbl.setDbName(dbName); tbl.setTableName(tblName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); StorageDescriptor sd = new StorageDescriptor(); tbl.setSd(sd); sd.setCols(cols); @@ -1565,7 +1567,7 @@ public void testStatsFastTrivial() throws Throwable { assertEquals(0,aggrStatsFull.getPartsFound()); // would still be empty, because no stats are actually populated. assertNotNull(aggrStatsFull.getColStats()); assert(aggrStatsFull.getColStats().isEmpty()); - + cleanUp(dbName,tblName,typeName); } public void testColumnStatistics() throws Throwable { @@ -1803,8 +1805,13 @@ public void testAlterTable() throws Exception { invCols.add(new FieldSchema("in.come", serdeConstants.INT_TYPE_NAME, "")); Table tbl = new Table(); + List partitionKeys = new ArrayList(); + FieldSchema ptnKey1 = new FieldSchema("ptnKey1", serdeConstants.STRING_TYPE_NAME, ""); + partitionKeys.add(ptnKey1); + tbl.setPartitionKeys(partitionKeys); tbl.setDbName(dbName); tbl.setTableName(invTblName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); StorageDescriptor sd = new StorageDescriptor(); tbl.setSd(sd); sd.setCols(invCols); @@ -1822,7 +1829,7 @@ public void testAlterTable() throws Exception { sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + boolean failed = false; try { client.createTable(tbl); @@ -1881,7 +1888,7 @@ public void testAlterTable() throws Exception { //try an invalid alter table with partition key name Table tbl_pk = client.getTable(tbl.getDbName(), tbl.getTableName()); - List partitionKeys = tbl_pk.getPartitionKeys(); + partitionKeys = tbl_pk.getPartitionKeys(); for (FieldSchema fs : partitionKeys) { fs.setName("invalid_to_change_name"); fs.setComment("can_change_comment"); @@ -1994,7 +2001,7 @@ public void testComplexTable() throws Exception { org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + tbl.setPartitionKeys(new ArrayList(2)); tbl.getPartitionKeys().add( new FieldSchema("ds", @@ -2715,45 +2722,49 @@ public void testConcurrentMetastores() throws Exception { String tblName = "concurrenttbl"; String renameTblName = "rename_concurrenttbl"; - try { - cleanUp(dbName, tblName, null); + String rawStoreImpl = HiveConf.getVar(hiveConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); + if (!rawStoreImpl.equalsIgnoreCase("org.apache.hadoop.hive.metastore.cache.CachedStore")) { + try { + cleanUp(dbName, tblName, null); - createDb(dbName); + createDb(dbName); - ArrayList cols = new ArrayList(2); - cols.add(new FieldSchema("c1", serdeConstants.STRING_TYPE_NAME, "")); - cols.add(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")); + ArrayList cols = new ArrayList(2); + cols.add(new FieldSchema("c1", serdeConstants.STRING_TYPE_NAME, "")); + cols.add(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")); - Map params = new HashMap(); - params.put("test_param_1", "Use this for comments etc"); + Map params = new HashMap(); + params.put("test_param_1", "Use this for comments etc"); - Map serdParams = new HashMap(); - serdParams.put(serdeConstants.SERIALIZATION_FORMAT, "1"); + Map serdParams = new HashMap(); + serdParams.put(serdeConstants.SERIALIZATION_FORMAT, "1"); - StorageDescriptor sd = createStorageDescriptor(tblName, cols, params, serdParams); + StorageDescriptor sd = createStorageDescriptor(tblName, cols, params, serdParams); - createTable(dbName, tblName, null, null, null, sd, 0); + createTable(dbName, tblName, null, null, null, sd, 0); - // get the table from the client, verify the name is correct - Table tbl2 = client.getTable(dbName, tblName); + // get the table from the client, verify the name is correct + Table tbl2 = client.getTable(dbName, tblName); - assertEquals("Client returned table with different name.", tbl2.getTableName(), tblName); + assertEquals("Client returned table with different name.", tbl2.getTableName(), tblName); - // Simulate renaming via another metastore Thrift server or another Hive CLI instance - updateTableNameInDB(tblName, renameTblName); + // Simulate renaming via another metastore Thrift server or another Hive CLI instance + updateTableNameInDB(tblName, renameTblName); - // get the table from the client again, verify the name has been updated - Table tbl3 = client.getTable(dbName, renameTblName); + // get the table from the client again, verify the name has been updated + Table tbl3 = client.getTable(dbName, renameTblName); - assertEquals("Client returned table with different name after rename.", - tbl3.getTableName(), renameTblName); + assertEquals("Client returned table with different name after rename.", + tbl3.getTableName(), renameTblName); + + } catch (Exception e) { + System.err.println(StringUtils.stringifyException(e)); + System.err.println("testConcurrentMetastores() failed."); + throw e; + } finally { + silentDropDatabase(dbName); + } - } catch (Exception e) { - System.err.println(StringUtils.stringifyException(e)); - System.err.println("testConcurrentMetastores() failed."); - throw e; - } finally { - silentDropDatabase(dbName); } } @@ -2940,9 +2951,13 @@ private Type createType(String typeName, Map fields) throws Thro Type typ1 = new Type(); typ1.setName(typeName); typ1.setFields(new ArrayList(fields.size())); - for(String fieldName : fields.keySet()) { - typ1.getFields().add( - new FieldSchema(fieldName, fields.get(fieldName), "")); + for (String fieldName : fields.keySet()) { + typ1.getFields().add(new FieldSchema(fieldName, fields.get(fieldName), "")); + } + try { + client.dropType(typeName); + } catch (NoSuchObjectException e) { + // Ignore } client.createType(typ1); return typ1; @@ -2981,7 +2996,8 @@ public void testTransactionalValidation() throws Throwable { fields.put("name", serdeConstants.STRING_TYPE_NAME); fields.put("income", serdeConstants.INT_TYPE_NAME); - Type type = createType("Person", fields); + String typeName = "Person"; + Type type = createType(typeName, fields); Map params = new HashMap(); params.put("transactional", ""); @@ -2996,7 +3012,7 @@ public void testTransactionalValidation() throws Throwable { // Fail - No "transactional" property is specified try { - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("'transactional' property of TBLPROPERTIES may only have value 'true': acidDb.acidTable", e.getMessage()); @@ -3006,7 +3022,7 @@ public void testTransactionalValidation() throws Throwable { try { params.clear(); params.put("transactional", "foobar"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("'transactional' property of TBLPROPERTIES may only have value 'true': acidDb.acidTable", e.getMessage()); @@ -3016,7 +3032,7 @@ public void testTransactionalValidation() throws Throwable { try { params.clear(); params.put("transactional", "true"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC): acidDb.acidTable", e.getMessage()); @@ -3029,7 +3045,7 @@ public void testTransactionalValidation() throws Throwable { List bucketCols = new ArrayList(); bucketCols.add("income"); sd.setBucketCols(bucketCols); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC): acidDb.acidTable", e.getMessage()); @@ -3043,7 +3059,7 @@ public void testTransactionalValidation() throws Throwable { sd.setBucketCols(bucketCols); sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); sd.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("CREATE TABLE should succeed", "true".equals(t.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))); /// ALTER TABLE scenarios @@ -3065,7 +3081,7 @@ public void testTransactionalValidation() throws Throwable { params.clear(); sd.unsetBucketCols(); sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); - t = createTable(dbName, tblName, owner, params, null, sd, 0); + t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); params.put("transactional", "true"); t.setParameters(params); client.alter_table(dbName, tblName, t); @@ -3086,6 +3102,7 @@ public void testTransactionalValidation() throws Throwable { t.setPartitionKeys(Collections.EMPTY_LIST); client.alter_table(dbName, tblName, t); Assert.assertTrue("ALTER TABLE should succeed", "true".equals(t.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))); + cleanUp(dbName,tblName, typeName); } private Table createTable(String dbName, String tblName, String owner, @@ -3146,7 +3163,7 @@ private StorageDescriptor createStorageDescriptor(String tableName, sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + return sd; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 14653b4043..125dd53f13 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hive.common.util.BloomFilter; import org.datanucleus.store.rdbms.query.ForwardQueryResult; import org.slf4j.Logger; @@ -216,7 +217,7 @@ private boolean ensureDbInit() { doCommit = true; } LinkedList initQueries = new LinkedList<>(); - + try { // Force the underlying db to initialize. initQueries.add(pm.newQuery(MDatabase.class, "name == ''")); @@ -938,7 +939,7 @@ static Long extractSqlLong(Object obj) throws MetaException { /** * Convert a boolean value returned from the RDBMS to a Java Boolean object. * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping. - * + * * @param value * column value from the database * @return The Boolean value of the database column value, null if the column @@ -1469,81 +1470,63 @@ private long partsFoundForPartitions(final String dbName, final String tableName }); } - // Get aggregated column stats for a table per partition for all columns in the partition - // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) - public Map> getColStatsForTablePartitions(String dbName, - String tblName, boolean enableBitVector) throws MetaException { - String queryText = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " - + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" - + " order by \"PARTITION_NAME\""; + public List getColStatsForAllTablePartitions(String dbName, + boolean enableBitVector) throws MetaException { + String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector) + + " from " + " " + PART_COL_STATS + " where \"DB_NAME\" = ?"; long start = 0; long end = 0; Query query = null; boolean doTrace = LOG.isDebugEnabled(); Object qResult = null; start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(dbName, tblName, - Collections.emptyList(), Collections.emptyList()), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyMap(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - List list = ensureList(qResult); - Map> partColStatsMap = - new HashMap>(); - String partNameCurrent = null; - List partColStatsList = new ArrayList(); - for (Object[] row : list) { - String partName = (String) row[0]; - if (partNameCurrent == null) { - // Update the current partition we are working on - partNameCurrent = partName; - // Create a new list for this new partition - partColStatsList = new ArrayList(); - // Add the col stat for the current column - partColStatsList.add(prepareCSObj(row, 1)); - } else if (!partNameCurrent.equalsIgnoreCase(partName)) { - // Save the previous partition and its col stat list - partColStatsMap.put(partNameCurrent, partColStatsList); - // Update the current partition we are working on - partNameCurrent = partName; - // Create a new list for this new partition - partColStatsList = new ArrayList(); - // Add the col stat for the current column - partColStatsList.add(prepareCSObj(row, 1)); - } else { - partColStatsList.add(prepareCSObj(row, 1)); + List colStatsForDB = new ArrayList(); + try { + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, new Object[] { dbName }, queryText); + if (qResult == null) { + query.closeAll(); + return colStatsForDB; } - Deadline.checkTimeout(); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + for (Object[] row : list) { + String tblName = (String) row[0]; + String partName = (String) row[1]; + ColumnStatisticsObj colStatObj = prepareCSObj(row, 2); + colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, dbName, tblName, partName)); + Deadline.checkTimeout(); + } + } finally { + query.closeAll(); } - query.closeAll(); - return partColStatsMap; + return colStatsForDB; } /** Should be called with the list short enough to not trip up Oracle/etc. */ private List columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List partNames, List colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) throws MetaException { - if(enableBitVector) { - return aggrStatsUseJava(dbName, tableName, partNames, colNames, useDensityFunctionForNDVEstimation, ndvTuner); - } - else { - return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); + boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) + throws MetaException { + if (enableBitVector) { + return aggrStatsUseJava(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); + } else { + return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } } private List aggrStatsUseJava(String dbName, String tableName, - List partNames, List colNames, + List partNames, List colNames, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { // 1. get all the stats for colNames in partNames; - List partStats = getPartitionStats(dbName, tableName, partNames, colNames, - true); + List partStats = + getPartitionStats(dbName, tableName, partNames, colNames, true); // 2. use util function to aggr stats return MetaStoreUtils.aggrPartitionStats(partStats, dbName, tableName, partNames, colNames, - useDensityFunctionForNDVEstimation, ndvTuner); + areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } private List aggrStatsUseDB(String dbName, diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index c9ff295590..0cd2d6f7e6 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -183,6 +183,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.ObjectPair; import org.apache.thrift.TException; import org.datanucleus.AbstractNucleusContext; @@ -7855,24 +7856,25 @@ protected String describeResult() { } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - final boolean enableBitVector = MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR); - return new GetHelper>>(dbName, tableName, true, false) { + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + final boolean enableBitVector = + MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR); + return new GetHelper>(dbName, null, true, false) { @Override - protected Map> getSqlResult( - GetHelper>> ctx) throws MetaException { - return directSql.getColStatsForTablePartitions(dbName, tblName, enableBitVector); + protected List getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector); } @Override - protected Map> getJdoResult( - GetHelper>> ctx) throws MetaException, - NoSuchObjectException { + protected List getJdoResult( + GetHelper> ctx) + throws MetaException, NoSuchObjectException { // This is fast path for query optimizations, if we can find this info // quickly using directSql, do it. No point in failing back to slow path // here. - throw new MetaException("Jdo path is not implemented for stats aggr."); + throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase."); } @Override diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index fa77f63567..3a5c442d59 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; public interface RawStore extends Configurable { @@ -596,17 +597,15 @@ AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** - * Get all partition column statistics for a table in a db + * Get column stats for all partitions of all tables in the database * * @param dbName - * @param tableName - * @return Map of partition column statistics. Key in the map is partition name. Value is a list - * of column stat object for each column in the partition + * @return List of column stats objects for all partitions of all tables in the database * @throws MetaException * @throws NoSuchObjectException */ - Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException; + List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException; /** * Get the next notification event. diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index 798ada811b..f0f650ddcf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -34,6 +34,14 @@ public class CacheUtils { private static final String delimit = "\u0001"; + public static String buildKey(String dbName) { + return dbName; + } + + public static String buildKeyWithDelimit(String dbName) { + return buildKey(dbName) + delimit; + } + public static String buildKey(String dbName, String tableName) { return dbName + delimit + tableName; } @@ -79,6 +87,10 @@ public static String buildKey(String dbName, String tableName, String colName) { return result; } + public static Object[] splitAggrColStats(String key) { + return key.split(delimit); + } + static Table assemble(TableWrapper wrapper, SharedCache sharedCache) { Table t = wrapper.getTable().deepCopy(); if (wrapper.getSdHash() != null) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index e1be6b9448..32e143431c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -76,6 +76,9 @@ import org.apache.hadoop.hive.metastore.api.PrivilegeBag; import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.metastore.api.WMTrigger; +import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; @@ -97,6 +100,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -126,6 +130,9 @@ private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( true); + private static ReentrantReadWriteLock partitionAggrColStatsCacheLock = + new ReentrantReadWriteLock(true); + private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false); private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); private static List whitelistPatterns = null; private static List blacklistPatterns = null; @@ -257,6 +264,14 @@ static void prewarm(RawStore rawStore) throws Exception { SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); for (int i = 0; i < dbNames.size(); i++) { String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); + // Cache partition column stats + Deadline.startTimer("getColStatsForDatabase"); + List colStatsForDB = + rawStore.getPartitionColStatsForDatabase(dbName); + Deadline.stopTimer(); + if (colStatsForDB != null) { + sharedCache.addPartitionColStatsToCache(colStatsForDB); + } LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); Database db = rawStore.getDatabase(dbName); sharedCache.addDatabaseToCache(dbName, db); @@ -285,14 +300,6 @@ static void prewarm(RawStore rawStore) throws Exception { for (Partition partition : partitions) { sharedCache.addPartitionToCache(dbName, tblName, partition); } - // Cache partition column stats - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - sharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); - } } // Cache table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); @@ -303,6 +310,28 @@ static void prewarm(RawStore rawStore) throws Exception { if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); } + // Cache aggregate stats for all partitions of a table and for all but default partition + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + if ((partNames != null) && (partNames.size() > 0)) { + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(tblName, tblName, partNames, colNames); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(tblName, tblName, partNames, colNames); + sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + } } } // Notify all blocked threads that prewarm is complete now @@ -325,11 +354,10 @@ synchronized static void startCacheUpdateService(Configuration conf) { if (cacheUpdateMaster == null) { initBlackListWhiteList(conf); if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { - cacheRefreshPeriod = - MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, - TimeUnit.MILLISECONDS); + cacheRefreshPeriod = MetastoreConf.getTimeVar(conf, + ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS); } - LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); + LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod); cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -339,8 +367,8 @@ public Thread newThread(Runnable r) { return t; } }); - cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, - cacheRefreshPeriod, TimeUnit.MILLISECONDS); + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod, + TimeUnit.MILLISECONDS); } } @@ -419,6 +447,7 @@ public void update() { // Update the database in cache updateDatabases(rawStore, dbNames); for (String dbName : dbNames) { + updateDatabasePartitionColStats(rawStore, dbName); // Update the tables in cache updateTables(rawStore, dbName); List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); @@ -430,8 +459,8 @@ public void update() { updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache updateTableColStats(rawStore, dbName, tblName); - // Update the partitions column stats for a table in cache - updateTablePartitionColStats(rawStore, dbName, tblName); + // Update aggregate column stats cache + updateAggregateStatsCache(rawStore, dbName, tblName); } } } @@ -440,6 +469,87 @@ public void update() { } } + private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) { + try { + Deadline.startTimer("getColStatsForDatabasePartitions"); + List colStatsForDB = + rawStore.getPartitionColStatsForDatabase(dbName); + Deadline.stopTimer(); + if (colStatsForDB != null) { + if (partitionColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update; the partition column stats " + + "list we have is dirty."); + return; + } + sharedCacheWrapper.getUnsafe() + .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB); + } + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}", + dbName, e); + } finally { + if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionColStatsCacheLock.writeLock().unlock(); + } + } + } + + // Update cached aggregate stats for all partitions of a table and for all + // but default partition + private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getAggregareStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) { + if (partitionAggrColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug( + "Skipping aggregate column stats cache update; the aggregate column stats we " + + "have is dirty."); + return; + } + sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache( + StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + } + } + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, + e); + } finally { + if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionAggrColStatsCacheLock.writeLock().unlock(); + } + } + } + private void updateDatabases(RawStore rawStore, List dbNames) { // Prepare the list of databases List databases = new ArrayList<>(); @@ -555,36 +665,6 @@ private void updateTableColStats(RawStore rawStore, String dbName, String tblNam } } } - - // Update the cached partition col stats for a table - private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { - try { - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - if (partitionColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update; the partition column stats " - + "list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshPartitionColStats( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), colStatsPerPartition); - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions column stats of table: " - + tblName, e); - } finally { - if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionColStatsCacheLock.writeLock().unlock(); - } - } - } } @Override @@ -838,6 +918,15 @@ public boolean addPartition(Partition part) throws InvalidObjectException, MetaE } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -864,6 +953,15 @@ public boolean addPartitions(String dbName, String tblName, List part } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -892,6 +990,15 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -909,7 +1016,8 @@ public Partition getPartition(String dbName, String tblName, List part_v Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part != null) { - part.unsetPrivileges(); + // TODO Manage privileges + part.setPrivileges(new PrincipalPrivilegeSet()); } else { throw new NoSuchObjectException("partition values=" + part_vals.toString()); } @@ -958,6 +1066,15 @@ public boolean dropPartition(String dbName, String tblName, List part_va } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -974,7 +1091,8 @@ public boolean dropPartition(String dbName, String tblName, List part_va List parts = sharedCache.listCachedPartitions(dbName, tblName, max); if (parts != null) { for (Partition part : parts) { - part.unsetPrivileges(); + // TODO Manage privileges + part.setPrivileges(new PrincipalPrivilegeSet()); } } return parts; @@ -1046,6 +1164,15 @@ public void alterTable(String dbName, String tblName, Table newTable) } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Update aggregate partition col stats keys wherever applicable + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } } @@ -1220,6 +1347,15 @@ public void alterPartition(String dbName, String tblName, List partVals, } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -1259,6 +1395,15 @@ public void alterPartitions(String dbName, String tblName, List> pa } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -1336,7 +1481,8 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, defaultPartitionName, maxParts, partNames, sharedCache); for (String partName : partNames) { Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); - part.unsetPrivileges(); + // TODO Manage privileges + part.setPrivileges(new PrincipalPrivilegeSet()); result.add(part); } return hasUnknownPartitions; @@ -1797,6 +1943,15 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partNames, List colNames) throws MetaException, NoSuchObjectException { + List colStats; dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } SharedCache sharedCache = sharedCacheWrapper.get(); - List colStats = - mergeColStatsForPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNames, colNames, sharedCache); - return new AggrStats(colStats, partNames.size()); + List allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + if (partNames.size() == allPartNames.size()) { + colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } else if (partNames.size() == (allPartNames.size() - 1)) { + String defaultPartitionName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); + if (!partNames.contains(defaultPartitionName)) { + colStats = + sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALLBUTDEFAULT); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } + } + LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", + tblName, partNames, colNames); + MergedColumnStatsForPartitions mergedColStats = + mergeColStatsForPartitions(dbName, tblName, partNames, colNames, sharedCache); + return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound()); } - private List mergeColStatsForPartitions(String dbName, String tblName, - List partNames, List colNames, SharedCache sharedCache) - throws MetaException { - final boolean useDensityFunctionForNDVEstimation = MetastoreConf.getBoolVar(getConf(), - ConfVars.STATS_NDV_DENSITY_FUNCTION); + private MergedColumnStatsForPartitions mergeColStatsForPartitions(String dbName, String tblName, + List partNames, List colNames, SharedCache sharedCache) throws MetaException { + final boolean useDensityFunctionForNDVEstimation = + MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION); final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER); - Map> map = new HashMap<>(); - + Map> colStatsMap = + new HashMap>(); + boolean areAllPartsFound = true; + long partsFound = 0; for (String colName : colNames) { - List colStats = new ArrayList<>(); + long partsFoundForColumn = 0; + ColumnStatsAggregator colStatsAggregator = null; + List colStatsWithPartInfoList = + new ArrayList(); for (String partName : partNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), - colName); - List colStat = new ArrayList<>(); - ColumnStatisticsObj colStatsForPart = sharedCache - .getCachedPartitionColStats(colStatsCacheKey); + String colStatsCacheKey = + CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = + sharedCache.getCachedPartitionColStats(colStatsCacheKey); if (colStatsForPart != null) { - colStat.add(colStatsForPart); - ColumnStatisticsDesc csDesc = new ColumnStatisticsDesc(false, dbName, tblName); - csDesc.setPartName(partName); - colStats.add(new ColumnStatistics(csDesc, colStat)); + ColStatsObjWithSourceInfo colStatsWithPartInfo = + new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName); + colStatsWithPartInfoList.add(colStatsWithPartInfo); + if (colStatsAggregator == null) { + colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator( + colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation, + ndvTuner); + } + partsFoundForColumn++; } else { - LOG.debug("Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", - dbName, tblName,partName, colName); + LOG.debug( + "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", + dbName, tblName, partName, colName); } } - map.put(colName, colStats); + if (colStatsWithPartInfoList.size() > 0) { + colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList); + } + if (partsFoundForColumn == partNames.size()) { + partsFound++; + } + if (colStatsMap.size() < 1) { + LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName, + tblName, partNames, colNames); + return new MergedColumnStatsForPartitions(new ArrayList(), 0); + } } // Note that enableBitVector does not apply here because ColumnStatisticsObj - // itself will tell whether - // bitvector is null or not and aggr logic can automatically apply. - return MetaStoreUtils.aggrPartitionStats(map, dbName, tblName, partNames, colNames, - useDensityFunctionForNDVEstimation, ndvTuner); + // itself will tell whether bitvector is null or not and aggr logic can automatically apply. + return new MergedColumnStatsForPartitions(MetaStoreUtils.aggrPartitionStats(colStatsMap, + partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound); } + class MergedColumnStatsForPartitions { + List colStats = new ArrayList(); + long partsFound; + + MergedColumnStatsForPartitions(List colStats, long partsFound) { + this.colStats = colStats; + this.partsFound = partsFound; + } + + List getColStats() { + return colStats; + } + + long getPartsFound() { + return partsFound; + } + } @Override public long cleanupEvents() { @@ -1982,6 +2199,15 @@ public void dropPartitions(String dbName, String tblName, List partNames } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -2247,9 +2473,9 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - return rawStore.getColStatsForTablePartitions(dbName, tableName); + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColStatsForDatabase(dbName); } public RawStore getRawStore() { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index b606779709..32ea17495f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +56,24 @@ private Map partitionColStatsCache = new TreeMap<>(); private Map tableColStatsCache = new TreeMap<>(); private Map sdCache = new HashMap<>(); + private Map> aggrColStatsCache = + new HashMap>(); private static MessageDigest md; + static enum StatsType { + ALL(0), ALLBUTDEFAULT(1); + + private final int position; + + private StatsType(int position) { + this.position = position; + } + + public int getPosition() { + return position; + } + } + private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); static { @@ -151,7 +169,11 @@ public synchronized void removeTableColStatsFromCache(String dbName, String tblN public synchronized void removeTableColStatsFromCache(String dbName, String tblName, String colName) { - tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + if (colName == null) { + removeTableColStatsFromCache(dbName, tblName); + } else { + tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + } } public synchronized void updateTableColStatsInCache(String dbName, String tableName, @@ -231,9 +253,9 @@ public synchronized void alterTableInPartitionColStatsCache(String dbName, Strin ColumnStatisticsObj colStatObj = entry.getValue(); if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - String newKey = - CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], - (List) decomposedKey[2], (String) decomposedKey[3]); + // New key has the new table name + String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), + (List) decomposedKey[2], (String) decomposedKey[3]); newPartitionColStats.put(newKey, colStatObj); iterator.remove(); } @@ -243,6 +265,31 @@ public synchronized void alterTableInPartitionColStatsCache(String dbName, Strin } } + public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + Map> newAggrColStatsCache = + new HashMap>(); + String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator>> iterator = + aggrColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + String key = entry.getKey(); + List value = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitAggrColStats(key); + // New key has the new table name + String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), + (String) decomposedKey[2]); + newAggrColStatsCache.put(newKey, value); + iterator.remove(); + } + } + aggrColStatsCache.putAll(newAggrColStatsCache); + } + } + public synchronized int getCachedTableCount() { return tableCache.size(); } @@ -340,6 +387,20 @@ public synchronized void removePartitionsFromCache(String dbName, String tblName } } + // Remove cached column stats for all partitions of all tables in a db + public synchronized void removePartitionColStatsFromCache(String dbName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + // Remove cached column stats for all partitions of a table public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); @@ -445,28 +506,94 @@ public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; } - public synchronized void addPartitionColStatsToCache(String dbName, String tableName, - Map> colStatsPerPartition) { - for (Map.Entry> entry : colStatsPerPartition.entrySet()) { - String partName = entry.getKey(); + public synchronized void addPartitionColStatsToCache( + List colStatsForDB) { + for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) { + List partVals; try { - List partVals = Warehouse.getPartValuesFromPartName(partName); - for (ColumnStatisticsObj colStatObj : entry.getValue()) { - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - partitionColStatsCache.put(key, colStatObj); - } + partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName()); + ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj(); + String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(), + colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); } catch (MetaException e) { - LOG.info("Unable to add partition: " + partName + " to SharedCache", e); + LOG.info("Unable to add partition stats for: {} to SharedCache", + colStatWithSourceInfo.getPartName(), e); } } + } - public synchronized void refreshPartitionColStats(String dbName, String tableName, - Map> newColStatsPerPartition) { - LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName - + " and table: " + tableName); - removePartitionColStatsFromCache(dbName, tableName); - addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); + public synchronized void refreshPartitionColStats(String dbName, + List colStatsForDB) { + LOG.debug("CachedStore: updating cached partition column stats objects for database: {}", + dbName); + removePartitionColStatsFromCache(dbName); + addPartitionColStatsToCache(colStatsForDB); + } + + public synchronized void addAggregateStatsToCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + if (aggrStatsAllPartitions != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = new ArrayList(); + value.add(StatsType.ALL.getPosition(), colStatObj); + aggrColStatsCache.put(key, value); + } + } + if (aggrStatsAllButDefaultPartition != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = aggrColStatsCache.get(key); + if ((value != null) && (value.size() > 0)) { + value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj); + } + } + } + } + + public List getAggrStatsFromCache(String dbName, String tblName, + List colNames, StatsType statsType) { + List colStats = new ArrayList(); + for (String colName : colNames) { + String key = CacheUtils.buildKey(dbName, tblName, colName); + List colStatList = aggrColStatsCache.get(key); + // If unable to find stats for a column, return null so we can build stats + if (colStatList == null) { + return null; + } + ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); + // If unable to find stats for this StatsType, return null so we can build + // stats + if (colStatObj == null) { + return null; + } + colStats.add(colStatObj); + } + return colStats; + } + + public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator>> iterator = + aggrColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public synchronized void refreshAggregateStatsCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName, + tblName); + removeAggrPartitionColStatsFromCache(dbName, tblName); + addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } public synchronized void addTableColStatsToCache(String dbName, String tableName, diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java index 45d5d8c984..c18b4c79bf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java @@ -22,30 +22,27 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BinaryColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BinaryColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); } BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats(); if (aggregateData == null) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java index 8aac0fe33d..7630183180 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java @@ -22,30 +22,27 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BooleanColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BooleanColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); } BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats(); if (aggregateData == null) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java index cd0392d6c0..0beaf60230 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java @@ -21,13 +21,15 @@ import java.util.List; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; public abstract class ColumnStatsAggregator { public boolean useDensityFunctionForNDVEstimation; public double ndvTuner; - public abstract ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException; + + public abstract ColumnStatisticsObj aggregate( + List colStatsWithSourceInfo, List partNames, + boolean areAllPartsFound) throws MetaException; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java index 7f2956152c..e8ff513f50 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java @@ -28,13 +28,13 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Date; import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,27 +44,23 @@ private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); + LOG.trace("doAllPartitionContainStats for column: {} is: {}", colName, doAllPartitionContainStats); } DateColumnStatsDataInspector dateColumnStats = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); @@ -92,13 +88,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DateColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -157,9 +153,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DateColumnStatsData newData = cso.getStatsData().getDateStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); @@ -175,9 +171,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DateColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); // newData.isSetBitVectors() should be true for sure because we @@ -230,11 +226,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getDateStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java index 05c0280262..271559985b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java @@ -29,13 +29,13 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,27 +45,24 @@ private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); + LOG.trace("doAllPartitionContainStats for column: {} is: {}", colName, + doAllPartitionContainStats); } DecimalColumnStatsDataInspector decimalColumnStatsData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); @@ -93,13 +90,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DecimalColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DecimalColumnStatsDataInspector newData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -167,9 +164,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils @@ -186,9 +183,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DecimalColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DecimalColumnStatsDataInspector newData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); // newData.isSetBitVectors() should be true for sure because we @@ -251,11 +248,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getDecimalStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java index faf22dcd7c..ece77dd51b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java @@ -28,12 +28,12 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,27 +43,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); + LOG.trace("doAllPartitionContainStats for column: {} is: {}", colName, + doAllPartitionContainStats); } DoubleColumnStatsDataInspector doubleColumnStatsData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); @@ -91,13 +88,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DoubleColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DoubleColumnStatsDataInspector newData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -154,9 +151,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -172,9 +169,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DoubleColumnStatsData aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DoubleColumnStatsDataInspector newData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); // newData.isSetBitVectors() should be true for sure because we @@ -226,11 +223,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {}. # of partitions requested: {}. # of partitions found: {}", colName, - columnStatisticsData.getDoubleStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {}. # of partitions requested: {}. # of partitions found: {}", + colName, columnStatisticsData.getDoubleStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java index d12cdc08ea..e6823d342a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,27 +44,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); + LOG.trace("doAllPartitionContainStats for column: {} is: {}", colName, + doAllPartitionContainStats); } LongColumnStatsDataInspector longColumnStatsData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); @@ -91,13 +89,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { LongColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); LongColumnStatsDataInspector newData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -155,9 +153,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); LongColumnStatsData newData = cso.getStatsData().getLongStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -173,9 +171,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; LongColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); LongColumnStatsDataInspector newData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); // newData.isSetBitVectors() should be true for sure because we @@ -227,11 +225,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getLongStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getLongStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java index 4539e6b026..2b8c4933b9 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +44,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors. Only when both of the conditions are true, we merge bit - // vectors. Otherwise, just use the maximum function. - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, + cso.getStatsData().getSetField()); + LOG.trace("doAllPartitionContainStats for column: {} is: {}", colName, + doAllPartitionContainStats); } StringColumnStatsDataInspector stringColumnStatsData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); @@ -92,10 +89,10 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); if (ndvEstimator != null) { @@ -134,9 +131,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); } @@ -148,9 +145,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); // newData.isSetBitVectors() should be true for sure because we @@ -198,11 +195,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, adjustedStatsMap.put(pseudoPartName.toString(), csd); } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, -1); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, -1); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getStringStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getStringStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 8bc4ce752e..50f873a013 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -87,6 +87,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Map.Entry; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -213,71 +214,91 @@ public static MetaException newMetaException(String errorMessage, Exception e) { } - // given a list of partStats, this function will give you an aggr stats + // Given a list of partStats, this function will give you an aggr stats public static List aggrPartitionStats(List partStats, - String dbName, String tableName, List partNames, List colNames, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) + String dbName, String tableName, List partNames, List colNames, + boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // 1. group by the stats by colNames - // map the colName to List - Map> map = new HashMap<>(); + Map> colStatsMap = + new HashMap>(); + // Group stats by colName for each partition + Map aliasToAggregator = + new HashMap(); for (ColumnStatistics css : partStats) { List objs = css.getStatsObj(); for (ColumnStatisticsObj obj : objs) { - List singleObj = new ArrayList<>(); - singleObj.add(obj); - ColumnStatistics singleCS = new ColumnStatistics(css.getStatsDesc(), singleObj); - if (!map.containsKey(obj.getColName())) { - map.put(obj.getColName(), new ArrayList<>()); + String partName = css.getStatsDesc().getPartName(); + if (aliasToAggregator.get(obj.getColName()) == null) { + aliasToAggregator.put(obj.getColName(), + ColumnStatsAggregatorFactory.getColumnStatsAggregator( + obj.getStatsData().getSetField(), useDensityFunctionForNDVEstimation, ndvTuner)); + colStatsMap.put(aliasToAggregator.get(obj.getColName()), + new ArrayList()); } - map.get(obj.getColName()).add(singleCS); + colStatsMap.get(aliasToAggregator.get(obj.getColName())) + .add(new ColStatsObjWithSourceInfo(obj, dbName, tableName, partName)); } } - return aggrPartitionStats(map,dbName,tableName,partNames,colNames,useDensityFunctionForNDVEstimation, ndvTuner); + if (colStatsMap.size() < 1) { + LOG.debug("No stats data found for: dbName= {}, tblName= {}, partNames= {}, colNames= {}", + dbName, tableName, partNames, colNames); + return new ArrayList(); + } + return aggrPartitionStats(colStatsMap, partNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } public static List aggrPartitionStats( - Map> map, String dbName, String tableName, - final List partNames, List colNames, - final boolean useDensityFunctionForNDVEstimation,final double ndvTuner) throws MetaException { - List colStats = new ArrayList<>(); - // 2. Aggregate stats for each column in a separate thread - if (map.size()< 1) { - //stats are absent in RDBMS - LOG.debug("No stats data found for: dbName=" +dbName +" tblName=" + tableName + - " partNames= " + partNames + " colNames=" + colNames ); - return colStats; - } - final ExecutorService pool = Executors.newFixedThreadPool(Math.min(map.size(), 16), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); + Map> colStatsMap, + final List partNames, final boolean areAllPartsFound, + final boolean useDensityFunctionForNDVEstimation, final double ndvTuner) + throws MetaException { + List aggrColStatObjs = new ArrayList(); + int numProcessors = Runtime.getRuntime().availableProcessors(); + final ExecutorService pool = + Executors.newFixedThreadPool(Math.min(colStatsMap.size(), numProcessors), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); final List> futures = Lists.newLinkedList(); - + LOG.debug("Aggregating column stats. Threads used: {}", + Math.min(colStatsMap.size(), numProcessors)); long start = System.currentTimeMillis(); - for (final Map.Entry> entry : map.entrySet()) { + for (final Entry> entry : colStatsMap + .entrySet()) { futures.add(pool.submit(new Callable() { @Override - public ColumnStatisticsObj call() throws Exception { - List css = entry.getValue(); - ColumnStatsAggregator aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css - .iterator().next().getStatsObj().iterator().next().getStatsData().getSetField(), - useDensityFunctionForNDVEstimation, ndvTuner); - ColumnStatisticsObj statsObj = aggregator.aggregate(entry.getKey(), partNames, css); - return statsObj; - }})); + public ColumnStatisticsObj call() throws MetaException { + List colStatWithSourceInfo = entry.getValue(); + ColumnStatsAggregator aggregator = entry.getKey(); + try { + ColumnStatisticsObj statsObj = + aggregator.aggregate(colStatWithSourceInfo, partNames, areAllPartsFound); + return statsObj; + } catch (MetaException e) { + LOG.debug(e.getMessage()); + throw e; + } + } + })); } pool.shutdown(); - for (Future future : futures) { - try { - colStats.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - pool.shutdownNow(); - LOG.debug(e.toString()); - throw new MetaException(e.toString()); + if (!futures.isEmpty()) { + for (Future future : futures) { + try { + if (future.get() != null) { + aggrColStatObjs.add(future.get()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new MetaException(e.toString()); + } + } } LOG.debug("Time for aggr col stats in seconds: {} Threads used: {}", - ((System.currentTimeMillis() - (double)start))/1000, Math.min(map.size(), 16)); - return colStats; + ((System.currentTimeMillis() - (double) start)) / 1000, + Math.min(colStatsMap.size(), numProcessors)); + return aggrColStatObjs; } public static double decimalToDouble(Decimal decimal) { @@ -1562,7 +1583,6 @@ public static boolean isMaterializedViewTable(Table table) { return cols; } - public static boolean isValidSchedulingPolicy(String str) { try { parseSchedulingPolicy(str); @@ -1578,4 +1598,36 @@ public static WMPoolSchedulingPolicy parseSchedulingPolicy(String schedulingPoli if ("DEFAULT".equals(schedulingPolicy)) return WMPoolSchedulingPolicy.FAIR; return Enum.valueOf(WMPoolSchedulingPolicy.class, schedulingPolicy); } + + // ColumnStatisticsObj with info about its db, table, partition (if table is partitioned) + public static class ColStatsObjWithSourceInfo { + private final ColumnStatisticsObj colStatsObj; + private final String dbName; + private final String tblName; + private final String partName; + + public ColStatsObjWithSourceInfo(ColumnStatisticsObj colStatsObj, String dbName, String tblName, + String partName) { + this.colStatsObj = colStatsObj; + this.dbName = dbName; + this.tblName = tblName; + this.partName = partName; + } + + public ColumnStatisticsObj getColStatsObj() { + return colStatsObj; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + public String getPartName() { + return partName; + } + } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 2aa5551a42..acb0cbdad6 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -73,6 +72,7 @@ import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; /** @@ -931,13 +931,6 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; - } - - @Override public String getMetastoreDbUuid() throws MetaException { throw new MetaException("Get metastore uuid is not implemented"); } @@ -1051,4 +1044,11 @@ public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerNa String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath); } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 4ec5864699..11416113e6 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -72,6 +71,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; import org.junit.Assert; @@ -943,13 +943,6 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; - } - - @Override public String getMetastoreDbUuid() throws MetaException { throw new MetaException("Get metastore uuid is not implemented"); } @@ -1048,4 +1041,11 @@ public void createWMTriggerToPoolMapping(String resourcePlanName, String trigger public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } }