diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1ba5968..493ca67 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -881,7 +881,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default property values for newly created tables"), DDL_CTL_PARAMETERS_WHITELIST("hive.ddl.createtablelike.properties.whitelist", "", "Table Properties to copy over when executing a Create Table Like."), - METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.cache.CachedStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), METASTORE_CACHED_RAW_STORE_IMPL("hive.metastore.cached.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index a205b8c..562301d 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -129,11 +129,16 @@ hive.metastore.rawstore.impl - org.apache.hadoop.hive.metastore.ObjectStore + org.apache.hadoop.hive.metastore.cache.CachedStore Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database + hive.metastore.cached.rawstore.impl + org.apache.hadoop.hive.metastore.ObjectStore + + + hive.querylog.location ${test.tmp.dir}/tmp Location of the structured hive logs diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml index 870b584..55d6f0a 100644 --- a/data/conf/llap/hive-site.xml +++ b/data/conf/llap/hive-site.xml @@ -145,10 +145,22 @@ hive.metastore.rawstore.impl - org.apache.hadoop.hive.metastore.ObjectStore + org.apache.hadoop.hive.metastore.cache.CachedStore Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database + + + hive.metastore.rawstore.impl + org.apache.hadoop.hive.metastore.cache.CachedStore + Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database + + + + hive.metastore.cached.rawstore.impl + org.apache.hadoop.hive.metastore.ObjectStore + + hive.querylog.location ${test.tmp.dir}/tmp diff --git a/data/conf/perf-reg/hive-site.xml b/data/conf/perf-reg/hive-site.xml index 012369f..c6b4ea7 100644 --- a/data/conf/perf-reg/hive-site.xml +++ b/data/conf/perf-reg/hive-site.xml @@ -144,11 +144,16 @@ hive.metastore.rawstore.impl - org.apache.hadoop.hive.metastore.ObjectStore + org.apache.hadoop.hive.metastore.cache.CachedStore Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database + hive.metastore.cached.rawstore.impl + org.apache.hadoop.hive.metastore.ObjectStore + + + hive.querylog.location ${test.tmp.dir}/tmp Location of the structured hive logs diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml index 35e8c99..a563ed2 100644 --- a/data/conf/tez/hive-site.xml +++ b/data/conf/tez/hive-site.xml @@ -139,11 +139,16 @@ hive.metastore.rawstore.impl - org.apache.hadoop.hive.metastore.ObjectStore + org.apache.hadoop.hive.metastore.cache.CachedStore Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database + hive.metastore.cached.rawstore.impl + org.apache.hadoop.hive.metastore.ObjectStore + + + hive.querylog.location ${test.tmp.dir}/tmp Location of the structured hive logs diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index d94d920..7942887 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.FileMetadataHandler; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.TableType; @@ -964,13 +965,14 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - return objectStore.getColStatsForTablePartitions(dbName, tableName); + public String getMetastoreDbUuid() throws MetaException { + throw new MetaException("getMetastoreDbUuid is not implemented"); } @Override - public String getMetastoreDbUuid() throws MetaException { - throw new MetaException("getMetastoreDbUuid is not implemented"); + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 98dad7a..a4c2c31 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; @@ -313,6 +314,7 @@ public static void partitionTester(HiveMetaStoreClient client, HiveConf hiveConf List partial = client.listPartitions(dbName, tblName, partialVals, (short) -1); + assertTrue("Should have returned 2 partitions", partial.size() == 2); assertTrue("Not all parts returned", partial.containsAll(parts)); @@ -321,6 +323,7 @@ public static void partitionTester(HiveMetaStoreClient client, HiveConf hiveConf partNames.add(part2Name); List partialNames = client.listPartitionNames(dbName, tblName, partialVals, (short) -1); + assertTrue("Should have returned 2 partition names", partialNames.size() == 2); assertTrue("Not all part names returned", partialNames.containsAll(partNames)); @@ -503,16 +506,17 @@ private static void verifyPartitionsPublished(HiveMetaStoreClient client, private static Partition makePartitionObject(String dbName, String tblName, List ptnVals, Table tbl, String ptnLocationSuffix) throws MetaException { - Partition part4 = new Partition(); - part4.setDbName(dbName); - part4.setTableName(tblName); - part4.setValues(ptnVals); - part4.setParameters(new HashMap()); - part4.setSd(tbl.getSd().deepCopy()); - part4.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); - part4.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); - MetaStoreUtils.updatePartitionStatsFast(part4, warehouse, null); - return part4; + Partition part = new Partition(); + part.setDbName(dbName); + part.setTableName(tblName); + part.setValues(ptnVals); + part.setParameters(new HashMap()); + part.setSd(tbl.getSd().deepCopy()); + part.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); + part.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); + part.setPrivileges(new PrincipalPrivilegeSet()); + MetaStoreUtils.updatePartitionStatsFast(part, warehouse, null); + return part; } public void testListPartitions() throws Throwable { @@ -642,7 +646,7 @@ public void testAlterTableCascade() throws Throwable { //get partition, this partition should not have the newly added column since cascade option //was false partition = client.getPartition(dbName, tblName, pvalues); - Assert.assertEquals("Unexpected number of cols", 3, partition.getSd().getCols().size()); + Assert.assertEquals("Unexpected number of cols", 3, partition.getSd().getCols().size()); } @@ -846,6 +850,7 @@ public void testAlterPartition() throws Throwable { Table tbl = new Table(); tbl.setDbName(dbName); tbl.setTableName(tblName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); StorageDescriptor sd = new StorageDescriptor(); tbl.setSd(sd); sd.setCols(cols); @@ -945,6 +950,7 @@ public void testRenamePartition() throws Throwable { Table tbl = new Table(); tbl.setDbName(dbName); tbl.setTableName(tblName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); StorageDescriptor sd = new StorageDescriptor(); tbl.setSd(sd); sd.setCols(cols); @@ -1565,6 +1571,8 @@ public void testStatsFastTrivial() throws Throwable { assertNotNull(aggrStatsFull.getColStats()); assert(aggrStatsFull.getColStats().isEmpty()); + cleanUp(dbName,tblName,typeName); + } public void testColumnStatistics() throws Throwable { @@ -1656,6 +1664,11 @@ public void testColumnStatistics() throws Throwable { boolean status = client.deleteTableColumnStatistics(dbName, tblName, null); assertTrue(status); // try to query stats for a column for which stats doesn't exist + List colStatsList = client.getTableColumnStatistics( + dbName, tblName, Lists.newArrayList(colName[1])); + for (ColumnStatisticsObj cso : colStatsList) { + System.out.print("VG: cso - " + cso); + } assertTrue(client.getTableColumnStatistics( dbName, tblName, Lists.newArrayList(colName[1])).isEmpty()); @@ -1802,8 +1815,13 @@ public void testAlterTable() throws Exception { invCols.add(new FieldSchema("in.come", serdeConstants.INT_TYPE_NAME, "")); Table tbl = new Table(); + List partitionKeys = new ArrayList(); + FieldSchema ptnKey1 = new FieldSchema("ptnKey1", serdeConstants.STRING_TYPE_NAME, ""); + partitionKeys.add(ptnKey1); + tbl.setPartitionKeys(partitionKeys); tbl.setDbName(dbName); tbl.setTableName(invTblName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); StorageDescriptor sd = new StorageDescriptor(); tbl.setSd(sd); sd.setCols(invCols); @@ -1821,7 +1839,7 @@ public void testAlterTable() throws Exception { sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + boolean failed = false; try { client.createTable(tbl); @@ -1880,7 +1898,7 @@ public void testAlterTable() throws Exception { //try an invalid alter table with partition key name Table tbl_pk = client.getTable(tbl.getDbName(), tbl.getTableName()); - List partitionKeys = tbl_pk.getPartitionKeys(); + partitionKeys = tbl_pk.getPartitionKeys(); for (FieldSchema fs : partitionKeys) { fs.setName("invalid_to_change_name"); fs.setComment("can_change_comment"); @@ -1993,7 +2011,7 @@ public void testComplexTable() throws Exception { org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + tbl.setPartitionKeys(new ArrayList(2)); tbl.getPartitionKeys().add( new FieldSchema("ds", @@ -2709,50 +2727,55 @@ private Table createTableForTestFilter(String dbName, String tableName, String o * current instance still sees the change. * @throws Exception */ + // Note this test won't work with CachedStore because updateTableNameInDB bypasses CachedStore public void testConcurrentMetastores() throws Exception { - String dbName = "concurrentdb"; - String tblName = "concurrenttbl"; - String renameTblName = "rename_concurrenttbl"; + String rawStoreImpl = HiveConf.getVar(hiveConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); + if (!rawStoreImpl.equalsIgnoreCase("org.apache.hadoop.hive.metastore.cache.CachedStore")) { + String dbName = "concurrentdb"; + String tblName = "concurrenttbl"; + String renameTblName = "rename_concurrenttbl"; - try { - cleanUp(dbName, tblName, null); + try { + cleanUp(dbName, tblName, null); - createDb(dbName); + createDb(dbName); - ArrayList cols = new ArrayList(2); - cols.add(new FieldSchema("c1", serdeConstants.STRING_TYPE_NAME, "")); - cols.add(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")); + ArrayList cols = new ArrayList(2); + cols.add(new FieldSchema("c1", serdeConstants.STRING_TYPE_NAME, "")); + cols.add(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")); - Map params = new HashMap(); - params.put("test_param_1", "Use this for comments etc"); + Map params = new HashMap(); + params.put("test_param_1", "Use this for comments etc"); - Map serdParams = new HashMap(); - serdParams.put(serdeConstants.SERIALIZATION_FORMAT, "1"); + Map serdParams = new HashMap(); + serdParams.put(serdeConstants.SERIALIZATION_FORMAT, "1"); - StorageDescriptor sd = createStorageDescriptor(tblName, cols, params, serdParams); + StorageDescriptor sd = createStorageDescriptor(tblName, cols, params, serdParams); - createTable(dbName, tblName, null, null, null, sd, 0); + createTable(dbName, tblName, null, null, null, sd, 0); - // get the table from the client, verify the name is correct - Table tbl2 = client.getTable(dbName, tblName); + // get the table from the client, verify the name is correct + Table tbl2 = client.getTable(dbName, tblName); - assertEquals("Client returned table with different name.", tbl2.getTableName(), tblName); + assertEquals("Client returned table with different name.", tbl2.getTableName(), tblName); - // Simulate renaming via another metastore Thrift server or another Hive CLI instance - updateTableNameInDB(tblName, renameTblName); + // Simulate renaming via another metastore Thrift server or another Hive + // CLI instance + updateTableNameInDB(tblName, renameTblName); - // get the table from the client again, verify the name has been updated - Table tbl3 = client.getTable(dbName, renameTblName); + // get the table from the client again, verify the name has been updated + Table tbl3 = client.getTable(dbName, renameTblName); - assertEquals("Client returned table with different name after rename.", - tbl3.getTableName(), renameTblName); + assertEquals("Client returned table with different name after rename.", + tbl3.getTableName(), renameTblName); - } catch (Exception e) { - System.err.println(StringUtils.stringifyException(e)); - System.err.println("testConcurrentMetastores() failed."); - throw e; - } finally { - silentDropDatabase(dbName); + } catch (Exception e) { + System.err.println(StringUtils.stringifyException(e)); + System.err.println("testConcurrentMetastores() failed."); + throw e; + } finally { + silentDropDatabase(dbName); + } } } @@ -2943,6 +2966,11 @@ private Type createType(String typeName, Map fields) throws Thro typ1.getFields().add( new FieldSchema(fieldName, fields.get(fieldName), "")); } + try { + client.dropType(typeName); + } catch (NoSuchObjectException e) { + // Ignore + } client.createType(typ1); return typ1; } @@ -2979,8 +3007,8 @@ public void testTransactionalValidation() throws Throwable { Map fields = new HashMap(); fields.put("name", serdeConstants.STRING_TYPE_NAME); fields.put("income", serdeConstants.INT_TYPE_NAME); - - Type type = createType("Person", fields); + String typeName = "Person"; + Type type = createType(typeName, fields); Map params = new HashMap(); params.put("transactional", ""); @@ -2995,7 +3023,7 @@ public void testTransactionalValidation() throws Throwable { // Fail - No "transactional" property is specified try { - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("'transactional' property of TBLPROPERTIES may only have value 'true'", e.getMessage()); @@ -3005,7 +3033,7 @@ public void testTransactionalValidation() throws Throwable { try { params.clear(); params.put("transactional", "foobar"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("'transactional' property of TBLPROPERTIES may only have value 'true'", e.getMessage()); @@ -3015,7 +3043,7 @@ public void testTransactionalValidation() throws Throwable { try { params.clear(); params.put("transactional", "true"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage()); @@ -3028,7 +3056,7 @@ public void testTransactionalValidation() throws Throwable { List bucketCols = new ArrayList(); bucketCols.add("income"); sd.setBucketCols(bucketCols); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage()); @@ -3042,7 +3070,7 @@ public void testTransactionalValidation() throws Throwable { sd.setBucketCols(bucketCols); sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); sd.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); - Table t = createTable(dbName, tblName, owner, params, null, sd, 0); + Table t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); Assert.assertTrue("CREATE TABLE should succeed", "true".equals(t.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))); /// ALTER TABLE scenarios @@ -3065,7 +3093,7 @@ public void testTransactionalValidation() throws Throwable { params.clear(); sd.unsetBucketCols(); sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); - t = createTable(dbName, tblName, owner, params, null, sd, 0); + t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); params.put("transactional", "true"); t.setParameters(params); client.alter_table(dbName, tblName, t); @@ -3080,12 +3108,13 @@ public void testTransactionalValidation() throws Throwable { sd.setNumBuckets(1); sd.setBucketCols(bucketCols); sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); - t = createTable(dbName, tblName, owner, params, null, sd, 0); + t = createTable(dbName, tblName, owner, params, Collections.EMPTY_MAP, sd, 0); params.put("transactional", "true"); t.setParameters(params); t.setPartitionKeys(Collections.EMPTY_LIST); client.alter_table(dbName, tblName, t); Assert.assertTrue("ALTER TABLE should succeed", "true".equals(t.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))); + cleanUp(dbName,tblName,typeName); } private Table createTable(String dbName, String tblName, String owner, @@ -3146,7 +3175,7 @@ private StorageDescriptor createStorageDescriptor(String tableName, sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); sd.setInputFormat(HiveInputFormat.class.getName()); sd.setOutputFormat(HiveOutputFormat.class.getName()); - + return sd; } @@ -3326,23 +3355,23 @@ private void createFunction(String dbName, String funcName, String className, client.createFunction(func); } - public void testRetriableClientWithConnLifetime() throws Exception { - - HiveConf conf = new HiveConf(hiveConf); - conf.setLong(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME.name(), 60); - long timeout = 65 * 1000; // Lets use a timeout more than the socket lifetime to simulate a reconnect - - // Test a normal retriable client - IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf, getHookLoader(), HiveMetaStoreClient.class.getName()); - client.getAllDatabases(); - client.close(); - - // Connect after the lifetime, there should not be any failures - client = RetryingMetaStoreClient.getProxy(conf, getHookLoader(), HiveMetaStoreClient.class.getName()); - Thread.sleep(timeout); - client.getAllDatabases(); - client.close(); - } +// public void testRetriableClientWithConnLifetime() throws Exception { +// +// HiveConf conf = new HiveConf(hiveConf); +// conf.setLong(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME.name(), 60); +// long timeout = 65 * 1000; // Lets use a timeout more than the socket lifetime to simulate a reconnect +// +// // Test a normal retriable client +// IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf, getHookLoader(), HiveMetaStoreClient.class.getName()); +// client.getAllDatabases(); +// client.close(); +// +// // Connect after the lifetime, there should not be any failures +// client = RetryingMetaStoreClient.getProxy(conf, getHookLoader(), HiveMetaStoreClient.class.getName()); +// Thread.sleep(timeout); +// client.getAllDatabases(); +// client.close(); +// } public void testJDOPersistanceManagerCleanup() throws Exception { if (isThriftClient == false) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index c1a8efe..08ba5c8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; @@ -1429,81 +1430,64 @@ private long partsFoundForPartitions(final String dbName, final String tableName }); } - // Get aggregated column stats for a table per partition for all columns in the partition - // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) - public Map> getColStatsForTablePartitions(String dbName, - String tblName, boolean enableBitVector) throws MetaException { - String queryText = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " - + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" - + " order by \"PARTITION_NAME\""; + public List getColStatsForAllTablePartitions(String dbName, + boolean enableBitVector) throws MetaException { + String queryText = + "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " + + " " + PART_COL_STATS + " where \"DB_NAME\" = ?"; long start = 0; long end = 0; Query query = null; boolean doTrace = LOG.isDebugEnabled(); Object qResult = null; start = doTrace ? System.nanoTime() : 0; - query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, prepareParams(dbName, tblName, - Collections.emptyList(), Collections.emptyList()), queryText); - if (qResult == null) { - query.closeAll(); - return Collections.emptyMap(); - } - end = doTrace ? System.nanoTime() : 0; - timingTrace(doTrace, queryText, start, end); - List list = ensureList(qResult); - Map> partColStatsMap = - new HashMap>(); - String partNameCurrent = null; - List partColStatsList = new ArrayList(); - for (Object[] row : list) { - String partName = (String) row[0]; - if (partNameCurrent == null) { - // Update the current partition we are working on - partNameCurrent = partName; - // Create a new list for this new partition - partColStatsList = new ArrayList(); - // Add the col stat for the current column - partColStatsList.add(prepareCSObj(row, 1)); - } else if (!partNameCurrent.equalsIgnoreCase(partName)) { - // Save the previous partition and its col stat list - partColStatsMap.put(partNameCurrent, partColStatsList); - // Update the current partition we are working on - partNameCurrent = partName; - // Create a new list for this new partition - partColStatsList = new ArrayList(); - // Add the col stat for the current column - partColStatsList.add(prepareCSObj(row, 1)); - } else { - partColStatsList.add(prepareCSObj(row, 1)); + List colStatsForDB = new ArrayList(); + try { + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, new Object[] { dbName }, queryText); + if (qResult == null) { + query.closeAll(); + return colStatsForDB; } - Deadline.checkTimeout(); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + for (Object[] row : list) { + String tblName = (String) row[0]; + String partName = (String) row[1]; + ColumnStatisticsObj colStatObj = prepareCSObj(row, 2); + colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, dbName, tblName, partName)); + Deadline.checkTimeout(); + } + } finally { + query.closeAll(); } - query.closeAll(); - return partColStatsMap; + return colStatsForDB; } /** Should be called with the list short enough to not trip up Oracle/etc. */ private List columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List partNames, List colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) throws MetaException { - if(enableBitVector) { - return aggrStatsUseJava(dbName, tableName, partNames, colNames, useDensityFunctionForNDVEstimation, ndvTuner); - } - else { - return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); + boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) + throws MetaException { + if (enableBitVector) { + return aggrStatsUseJava(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); + } else { + return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } } private List aggrStatsUseJava(String dbName, String tableName, - List partNames, List colNames, + List partNames, List colNames, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { // 1. get all the stats for colNames in partNames; - List partStats = getPartitionStats(dbName, tableName, partNames, colNames, - true); + List partStats = + getPartitionStats(dbName, tableName, partNames, colNames, true); // 2. use util function to aggr stats return MetaStoreUtils.aggrPartitionStats(partStats, dbName, tableName, partNames, colNames, - useDensityFunctionForNDVEstimation, ndvTuner); + areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } private List aggrStatsUseDB(String dbName, diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index b51446d..8a63412 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1814,74 +1814,88 @@ public static MetaException newMetaException(String errorMessage, Exception e) { return cols; } - // given a list of partStats, this function will give you an aggr stats + // Given a list of partStats, this function will give you an aggr stats public static List aggrPartitionStats(List partStats, String dbName, String tableName, List partNames, List colNames, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) + boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // 1. group by the stats by colNames - // map the colName to List - Map> map = new HashMap<>(); + Map> colStatsMap = + new HashMap>(); + // Group stats by colName for each partition for (ColumnStatistics css : partStats) { List objs = css.getStatsObj(); + String partName = css.getStatsDesc().getPartName(); for (ColumnStatisticsObj obj : objs) { - List singleObj = new ArrayList<>(); - singleObj.add(obj); - ColumnStatistics singleCS = new ColumnStatistics(css.getStatsDesc(), singleObj); - if (!map.containsKey(obj.getColName())) { - map.put(obj.getColName(), new ArrayList()); + ColumnStatsAggregator colStatsAggregator = + ColumnStatsAggregatorFactory.getColumnStatsAggregator(obj.getStatsData().getSetField(), + useDensityFunctionForNDVEstimation, ndvTuner); + if (!colStatsMap.containsKey(colStatsAggregator)) { + colStatsMap.put(colStatsAggregator, new ArrayList()); } - map.get(obj.getColName()).add(singleCS); + colStatsMap.get(colStatsAggregator).add( + new ColStatsObjWithSourceInfo(obj, dbName, tableName, partName)); } } - return aggrPartitionStats(map,dbName,tableName,partNames,colNames,useDensityFunctionForNDVEstimation, ndvTuner); + if (colStatsMap.size() < 1) { + LOG.debug("No stats data found for: dbName= {}, tblName= {}, partNames= {}, colNames= {}", + dbName, tableName, partNames, colNames); + return new ArrayList(); + } + return aggrPartitionStats(colStatsMap, partNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } public static List aggrPartitionStats( - Map> map, String dbName, String tableName, - final List partNames, List colNames, - final boolean useDensityFunctionForNDVEstimation,final double ndvTuner) throws MetaException { - List colStats = new ArrayList<>(); - // 2. Aggregate stats for each column in a separate thread - if (map.size()< 1) { - //stats are absent in RDBMS - LOG.debug("No stats data found for: dbName=" +dbName +" tblName=" + tableName + - " partNames= " + partNames + " colNames=" + colNames ); - return colStats; - } - final ExecutorService pool = Executors.newFixedThreadPool(Math.min(map.size(), 16), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); + Map> colStatsMap, + final List partNames, final boolean areAllPartsFound, + final boolean useDensityFunctionForNDVEstimation, final double ndvTuner) throws MetaException { + List aggrColStatObjs = new ArrayList(); + int numProcessors = Runtime.getRuntime().availableProcessors(); + final ExecutorService pool = + Executors.newFixedThreadPool(Math.min(colStatsMap.size(), numProcessors), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); final List> futures = Lists.newLinkedList(); - + LOG.debug("Aggregating column stats. Threads used: " + + Math.min(colStatsMap.size(), numProcessors)); long start = System.currentTimeMillis(); - for (final Entry> entry : map.entrySet()) { + for (final Entry> entry : colStatsMap + .entrySet()) { futures.add(pool.submit(new Callable() { @Override - public ColumnStatisticsObj call() throws Exception { - List css = entry.getValue(); - ColumnStatsAggregator aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css - .iterator().next().getStatsObj().iterator().next().getStatsData().getSetField(), - useDensityFunctionForNDVEstimation, ndvTuner); - ColumnStatisticsObj statsObj = aggregator.aggregate(entry.getKey(), partNames, css); - return statsObj; - }})); + public ColumnStatisticsObj call() throws MetaException { + List colStatWithSourceInfo = entry.getValue(); + ColumnStatsAggregator aggregator = entry.getKey(); + try { + ColumnStatisticsObj statsObj = + aggregator.aggregate(colStatWithSourceInfo, partNames, areAllPartsFound); + return statsObj; + } catch (MetaException e) { + LOG.debug(e.getMessage()); + throw e; + } + } + })); } pool.shutdown(); - for (Future future : futures) { - try { - colStats.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - pool.shutdownNow(); - LOG.debug(e.toString()); - throw new MetaException(e.toString()); + if (!futures.isEmpty()) { + for (Future future : futures) { + try { + if (future.get() != null) { + aggrColStatObjs.add(future.get()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new MetaException(e.toString()); + } } } LOG.debug("Time for aggr col stats in seconds: {} Threads used: {}", - ((System.currentTimeMillis() - (double)start))/1000, Math.min(map.size(), 16)); - return colStats; + ((System.currentTimeMillis() - (double) start)) / 1000, + Math.min(colStatsMap.size(), numProcessors)); + return aggrColStatObjs; } - /** * Produce a hash for the storage descriptor * @param sd storage descriptor to hash @@ -1974,4 +1988,36 @@ public ColumnStatisticsObj call() throws Exception { public static double decimalToDouble(Decimal decimal) { return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue(); } + + // ColumnStatisticsObj with info about its db, table, partition (if table is partitioned) + public static class ColStatsObjWithSourceInfo { + private final ColumnStatisticsObj colStatsObj; + private final String dbName; + private final String tblName; + private final String partName; + + public ColStatsObjWithSourceInfo(ColumnStatisticsObj colStatsObj, String dbName, + String tblName, String partName) { + this.colStatsObj = colStatsObj; + this.dbName = dbName; + this.tblName = tblName; + this.partName = partName; + } + + public ColumnStatisticsObj getColStatsObj() { + return colStatsObj; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + public String getPartName() { + return partName; + } + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b5e4bf0..0a6049e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -63,6 +63,7 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; + import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configurable; @@ -76,6 +77,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; @@ -7617,25 +7619,25 @@ protected String describeResult() { } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - final boolean enableBitVector = HiveConf.getBoolVar(getConf(), - HiveConf.ConfVars.HIVE_STATS_FETCH_BITVECTOR); - return new GetHelper>>(dbName, tableName, true, false) { + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + final boolean enableBitVector = + HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_STATS_FETCH_BITVECTOR); + return new GetHelper>(dbName, null, true, false) { @Override - protected Map> getSqlResult( - GetHelper>> ctx) throws MetaException { - return directSql.getColStatsForTablePartitions(dbName, tblName, enableBitVector); + protected List getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector); } @Override - protected Map> getJdoResult( - GetHelper>> ctx) throws MetaException, + protected List getJdoResult( + GetHelper> ctx) throws MetaException, NoSuchObjectException { // This is fast path for query optimizations, if we can find this info // quickly using directSql, do it. No point in failing back to slow path // here. - throw new MetaException("Jdo path is not implemented for stats aggr."); + throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase."); } @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 2bc4d99..6d239d2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -588,17 +590,16 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** - * Get all partition column statistics for a table in a db + * Get column stats for all partitions of all tables in the database * * @param dbName - * @param tableName - * @return Map of partition column statistics. Key in the map is partition name. Value is a list - * of column stat object for each column in the partition - * @throws MetaException + * @return List of column stats objects for all partitions of all tables in + * the database * @throws NoSuchObjectException + * @throws MetaException */ - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException; + public abstract List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException; /** * Get the next notification event. diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index 280655d..7c68922 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -34,6 +34,14 @@ public class CacheUtils { private static final String delimit = "\u0001"; + public static String buildKey(String dbName) { + return dbName; + } + + public static String buildKeyWithDelimit(String dbName) { + return buildKey(dbName) + delimit; + } + public static String buildKey(String dbName, String tableName) { return dbName + delimit + tableName; } @@ -88,6 +96,10 @@ public static String buildKey(String dbName, String tableName, String colName) { return result; } + public static Object[] splitAggrColStats(String key) { + return key.split(delimit); + } + public static Table assemble(TableWrapper wrapper) { Table t = wrapper.getTable().deepCopy(); if (wrapper.getSdHash()!=null) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 7939bfe..1544529 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -35,9 +35,11 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.PartFilterExprUtil; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; @@ -84,6 +86,9 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; @@ -97,7 +102,6 @@ // TODO constraintCache // TODO need sd nested copy? // TODO String intern -// TODO restructure HBaseStore // TODO monitor event queue // TODO initial load slow? // TODO size estimation @@ -116,6 +120,9 @@ private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( true); private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock partitionAggrColStatsCacheLock = new ReentrantReadWriteLock( + true); + private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false); RawStore rawStore = null; Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -217,7 +224,9 @@ public void setConf(Configuration conf) { if (firstTime) { try { LOG.info("Prewarming CachedStore"); + long start = System.currentTimeMillis(); prewarm(); + LOG.info("Time taken to prewarm: " + (System.currentTimeMillis()-start)/1000); LOG.info("CachedStore initialized"); // Start the cache update master-worker threads startCacheUpdateService(); @@ -230,31 +239,32 @@ public void setConf(Configuration conf) { @VisibleForTesting void prewarm() throws Exception { - // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + // Prevents throwing exceptions in our raw store calls since we're not using + // RawStoreProxy Deadline.registerIfNot(1000000); List dbNames = rawStore.getAllDatabases(); for (String dbName : dbNames) { + // Cache partition column stats + Deadline.startTimer("getColStatsForDatabase"); + List colStatsForDB = + rawStore.getPartitionColStatsForDatabase(dbName); + Deadline.stopTimer(); + if (colStatsForDB != null) { + SharedCache.addPartitionColStatsToCache(colStatsForDB); + } Database db = rawStore.getDatabase(dbName); - SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + SharedCache.addDatabaseToCache(dbName, db); List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { Table table = rawStore.getTable(dbName, tblName); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), table); + tblName = HiveStringUtils.normalizeIdentifier(tblName); + SharedCache.addTableToCache(dbName, tblName, table); Deadline.startTimer("getPartitions"); List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); for (Partition partition : partitions) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partition); - } - // Cache partition column stats - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + SharedCache.addPartitionToCache(dbName, tblName, partition); } // Cache table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); @@ -263,8 +273,36 @@ void prewarm() throws Exception { rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + SharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); + } + // Cache aggregate stats for all partitions of a table and for all but + // default partition + List partNames = listPartitionNames(dbName, tblName, (short) -1); + if ((partNames != null) && (partNames.size() > 0)) { + MergedColumnStatsForPartitions mergedColStatsAllParts = + mergeColStatsForPartitions(dbName, tblName, partNames, colNames); + AggrStats aggrStatsAllPartitions = + new AggrStats(mergedColStatsAllParts.getColStats(), + mergedColStatsAllParts.getPartsFound()); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + MergedColumnStatsForPartitions mergedColStatsAllButDefaultParts = mergeColStatsForPartitions( + dbName, tblName, partNames, colNames); + AggrStats aggrStatsAllButDefaultPartition = new AggrStats( + mergedColStatsAllButDefaultParts.getColStats(), mergedColStatsAllButDefaultParts + .getPartsFound()); + SharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } } } @@ -288,7 +326,8 @@ public Thread newThread(Runnable r) { HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS); } - LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); + LOG.info("CachedStore: starting cache update service (runs every " + cacheRefreshPeriod + + "ms)"); cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod, cacheRefreshPeriod, TimeUnit.MILLISECONDS); } @@ -343,6 +382,7 @@ public void run() { // Update the database in cache updateDatabases(rawStore, dbNames); for (String dbName : dbNames) { + updateDatabasePartitionColStats(rawStore, dbName); // Update the tables in cache updateTables(rawStore, dbName); List tblNames = cachedStore.getAllTables(dbName); @@ -351,8 +391,8 @@ public void run() { updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache updateTableColStats(rawStore, dbName, tblName); - // Update the partitions column stats for a table in cache - updateTablePartitionColStats(rawStore, dbName, tblName); + // Update aggregate column stats cache + updateAggregateStatsCache(rawStore, dbName, tblName); } } } @@ -400,6 +440,34 @@ private void updateDatabases(RawStore rawStore, List dbNames) { } } + private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) { + try { + Deadline.startTimer("getColStatsForDatabasePartitions"); + List colStatsForDB = + rawStore.getPartitionColStatsForDatabase(dbName); + Deadline.stopTimer(); + if (colStatsForDB != null) { + if (partitionColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update; the partition column stats " + + "list we have is dirty."); + return; + } + SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName), + colStatsForDB); + } + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions column stats of database: " + + dbName, e); + } finally { + if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionColStatsCacheLock.writeLock().unlock(); + } + } + } + // Update the cached table objects private void updateTables(RawStore rawStore, String dbName) { List tables = new ArrayList
(); @@ -482,31 +550,55 @@ private void updateTableColStats(RawStore rawStore, String dbName, String tblNam } } - // Update the cached partition col stats for a table - private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { + // Update cached aggregate stats for all partitions of a table and for all + // but default partition + private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) { try { - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - if (partitionColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update; the partition column stats " - + "list we have is dirty."); - return; + Table table = rawStore.getTable(dbName, tblName); + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getAggregareStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + HiveConf.getVar(cachedStore.conf, ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) { + if (partitionAggrColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping aggregate column stats cache update; the aggregate column stats we " + + "have is dirty."); + return; + } + SharedCache.refreshAggregateStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } - SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition); } } } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions column stats of table: " - + tblName, e); + LOG.info( + "Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e); } finally { - if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionColStatsCacheLock.writeLock().unlock(); + if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionAggrColStatsCacheLock.writeLock().unlock(); } } } @@ -664,17 +756,18 @@ public void createTable(Table tbl) throws InvalidObjectException, MetaException } @Override - public boolean dropTable(String dbName, String tableName) throws MetaException, + public boolean dropTable(String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropTable(dbName, tableName); + boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Remove table try { // Wait if background table cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName)); + SharedCache.removeTableFromCache(dbName, tblName); } finally { tableCacheLock.readLock().unlock(); } @@ -683,8 +776,7 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, // Wait if background table col stats cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName)); + SharedCache.removeTableColStatsFromCache(dbName, tblName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -696,8 +788,9 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, public Table getTable(String dbName, String tableName) throws MetaException { Table tbl = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName)); + // TODO Manage privileges if (tbl != null) { - tbl.unsetPrivileges(); + tbl.setPrivileges(new PrincipalPrivilegeSet()); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); } return tbl; @@ -707,15 +800,25 @@ public Table getTable(String dbName, String tableName) throws MetaException { public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { + String dbName = HiveStringUtils.normalizeIdentifier(part.getDbName()); + String tblName = HiveStringUtils.normalizeIdentifier(part.getTableName()); try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), - HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + SharedCache.addPartitionToCache(dbName, tblName, part); } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -725,17 +828,27 @@ public boolean addPartitions(String dbName, String tblName, List part throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); for (Partition part : parts) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), part); + SharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -745,6 +858,8 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -752,12 +867,20 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); while (iterator.hasNext()) { Partition part = iterator.next(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), part); + SharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -765,11 +888,11 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p @Override public Partition getPartition(String dbName, String tableName, List part_vals) throws MetaException, NoSuchObjectException { - Partition part = - SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + // TODO Manage privileges if (part != null) { - part.unsetPrivileges(); + part.setPrivileges(new PrincipalPrivilegeSet()); } else { throw new NoSuchObjectException("partition values=" + part_vals.toString()); } @@ -784,17 +907,18 @@ public boolean doesPartitionExist(String dbName, String tableName, } @Override - public boolean dropPartition(String dbName, String tableName, List part_vals) + public boolean dropPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + boolean succ = rawStore.dropPartition(dbName, tblName, part_vals); if (succ) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Remove partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + SharedCache.removePartitionFromCache(dbName, tblName, part_vals); } finally { partitionCacheLock.readLock().unlock(); } @@ -803,11 +927,19 @@ public boolean dropPartition(String dbName, String tableName, List part_ // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + SharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -815,11 +947,12 @@ public boolean dropPartition(String dbName, String tableName, List part_ @Override public List getPartitions(String dbName, String tableName, int max) throws MetaException, NoSuchObjectException { - List parts = SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), max); + List parts = SharedCache.listCachedPartitions(HiveStringUtils + .normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tableName), max); + // TODO Manage privileges if (parts != null) { for (Partition part : parts) { - part.unsetPrivileges(); + part.setPrivileges(new PrincipalPrivilegeSet()); } } return parts; @@ -830,13 +963,14 @@ public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); validateTableType(newTable); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Update table cache try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), newTable); + SharedCache.alterTableInCache(dbName, tblName, newTable); } finally { tableCacheLock.readLock().unlock(); } @@ -846,11 +980,19 @@ public void alterTable(String dbName, String tblName, Table newTable) // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), newTable); + SharedCache.alterTableInPartitionCache(dbName, tblName, newTable); } finally { partitionCacheLock.readLock().unlock(); } + // Update aggregate partition col stats keys wherever applicable + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -886,12 +1028,19 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public List
getTableObjectsByName(String dbName, - List tblNames) throws MetaException, UnknownDBException { + public List
getTableObjectsByName(String dbName, List tblNames) + throws MetaException, UnknownDBException { List
tables = new ArrayList
(); + Database db = SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + if (db == null) { + throw new UnknownDBException("Could not find database " + dbName); + } for (String tblName : tblNames) { - tables.add(SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName))); + Table tbl = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + if ((tbl != null) && (!tables.contains(tbl))) { + tables.add(tbl); + } } return tables; } @@ -906,18 +1055,10 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override + // TODO: implement this in CachedStore public List listTableNamesByFilter(String dbName, String filter, - short max_tables) throws MetaException, UnknownDBException { - List tableNames = new ArrayList(); - int count = 0; - for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; - } - } - return tableNames; + short maxTables) throws MetaException, UnknownDBException { + return rawStore.listTableNamesByFilter(dbName, filter, maxTables); } @Override @@ -944,23 +1085,24 @@ public PartitionValuesResponse listPartitionValues(String db_name, String tbl_na } @Override - public List listPartitionNamesByFilter(String db_name, - String tbl_name, String filter, short max_parts) throws MetaException { - // TODO Translate filter -> expr - return null; + // TODO: implement this in CachedStore + public List listPartitionNamesByFilter(String dbName, + String tblName, String filter, short maxParts) throws MetaException { + return rawStore.listPartitionNamesByFilter(dbName, tblName, filter, maxParts); } @Override public void alterPartition(String dbName, String tblName, List partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + SharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } finally { partitionCacheLock.readLock().unlock(); } @@ -969,17 +1111,27 @@ public void alterPartition(String dbName, String tblName, List partVals, // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + SharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override public void alterPartitions(String dbName, String tblName, List> partValsList, List newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Update partition cache try { // Wait if background cache update is happening @@ -988,8 +1140,7 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + SharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } } finally { partitionCacheLock.readLock().unlock(); @@ -1002,12 +1153,20 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + SharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -1064,8 +1223,8 @@ private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, public List getPartitionsByFilter(String dbName, String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + // TODO Implement using CachedStore + return rawStore.getPartitionsByFilter(dbName, tblName, filter, maxParts); } @Override @@ -1079,7 +1238,8 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, for (String partName : partNames) { Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); - part.unsetPrivileges(); + // TODO Manage privileges + part.setPrivileges(new PrincipalPrivilegeSet()); result.add(part); } return hasUnknownPartitions; @@ -1449,6 +1609,9 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName } } + /** + * If colName is null, all column stats associated with the table are deleted + */ @Override public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { @@ -1473,38 +1636,44 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsObjs = colStats.getStatsObj(); - Partition part = getPartition(dbName, tableName, partVals); + Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Update partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), partVals, part); + SharedCache.alterPartitionInCache(dbName, tblName, partVals, part); } finally { partitionCacheLock.readLock().unlock(); } - // Update partition column stats try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.updatePartitionColStatsInCache( - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + SharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj()); } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -1518,68 +1687,135 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partVals, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = - rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + rawStore.deletePartitionColumnStatistics(dbName, tblName, partName, partVals, colName); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); if (succ) { try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), partVals, colName); + SharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } return succ; } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, NoSuchObjectException { - List colStats = mergeColStatsForPartitions( - HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), - partNames, colNames); - return new AggrStats(colStats, partNames.size()); - - } + public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, + List colNames) throws MetaException, NoSuchObjectException { + List colStats; + List allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); + if (partNames.size() == allPartNames.size()) { + colStats = SharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } else if (partNames.size() == (allPartNames.size() - 1)) { + String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); + if (!partNames.contains(defaultPartitionName)) { + colStats = + SharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALLBUTDEFAULT); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } + } + LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", + tblName, partNames, colNames); + MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(dbName, tblName, + partNames, colNames); + return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound()); + } - private List mergeColStatsForPartitions(String dbName, String tblName, + private MergedColumnStatsForPartitions mergeColStatsForPartitions(String dbName, String tblName, List partNames, List colNames) throws MetaException { - final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); - final double ndvTuner = HiveConf.getFloatVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); - Map> map = new HashMap<>(); - + final boolean useDensityFunctionForNDVEstimation = + HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + final double ndvTuner = + HiveConf.getFloatVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); + Map> colStatsMap = + new HashMap>(); + boolean areAllPartsFound = true; + long partsFound = 0; for (String colName : colNames) { - List colStats = new ArrayList<>(); + long partsFoundForColumn = 0; + ColumnStatsAggregator colStatsAggregator = null; + List colStatsWithPartInfoList = + new ArrayList(); for (String partName : partNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), - colName); - List colStat = new ArrayList<>(); - ColumnStatisticsObj colStatsForPart = SharedCache - .getCachedPartitionColStats(colStatsCacheKey); + String colStatsCacheKey = + CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = + SharedCache.getCachedPartitionColStats(colStatsCacheKey); if (colStatsForPart != null) { - colStat.add(colStatsForPart); - ColumnStatisticsDesc csDesc = new ColumnStatisticsDesc(false, dbName, tblName); - csDesc.setPartName(partName); - colStats.add(new ColumnStatistics(csDesc, colStat)); + ColStatsObjWithSourceInfo colStatsWithPartInfo = + new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName); + colStatsWithPartInfoList.add(colStatsWithPartInfo); + if (colStatsAggregator == null) { + colStatsAggregator = + ColumnStatsAggregatorFactory.getColumnStatsAggregator(colStatsForPart + .getStatsData().getSetField(), useDensityFunctionForNDVEstimation, ndvTuner); + } + partsFoundForColumn++; } else { - LOG.debug("Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", - dbName, tblName,partName, colName); + LOG.debug( + "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", + dbName, tblName, partName, colName); } } - map.put(colName, colStats); + if (colStatsWithPartInfoList.size() > 0) { + colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList); + } + if (partsFoundForColumn == partNames.size()) { + partsFound++; + } + } + if (colStatsMap.size() < 1) { + LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName, + tblName, partNames, colNames); + return new MergedColumnStatsForPartitions(new ArrayList(), 0); } // Note that enableBitVector does not apply here because ColumnStatisticsObj - // itself will tell whether - // bitvector is null or not and aggr logic can automatically apply. - return MetaStoreUtils.aggrPartitionStats(map, dbName, tblName, partNames, colNames, - useDensityFunctionForNDVEstimation, ndvTuner); + // itself will tell whether bitvector is null or not and aggr logic can + // automatically apply. + return new MergedColumnStatsForPartitions(MetaStoreUtils.aggrPartitionStats(colStatsMap, + partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound); + } + + class MergedColumnStatsForPartitions { + List colStats = new ArrayList(); + long partsFound; + + MergedColumnStatsForPartitions(List colStats, long partsFound) { + this.colStats = colStats; + this.partsFound = partsFound; + } + + List getColStats() { + return colStats; + } + + long getPartsFound() { + return partsFound; + } } @@ -1649,6 +1885,8 @@ public void setMetaStoreSchemaVersion(String version, String comment) public void dropPartitions(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tblName = HiveStringUtils.normalizeIdentifier(tblName); // Remove partitions try { // Wait if background cache update is happening @@ -1656,8 +1894,7 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionCacheDirty.set(true); for (String partName : partNames) { List vals = partNameToVals(partName); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), vals); + SharedCache.removePartitionFromCache(dbName, tblName, vals); } } finally { partitionCacheLock.readLock().unlock(); @@ -1669,12 +1906,20 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionColStatsCacheDirty.set(true); for (String partName : partNames) { List part_vals = partNameToVals(partName); - SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), part_vals); + SharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } } finally { partitionColStatsCacheLock.readLock().unlock(); } + // Remove aggregate partition col stats for this table + try { + // Wait if background cache update is happening + partitionAggrColStatsCacheLock.readLock().lock(); + isPartitionAggrColStatsCacheDirty.set(true); + SharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); + } finally { + partitionAggrColStatsCacheLock.readLock().unlock(); + } } @Override @@ -1935,9 +2180,9 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - return rawStore.getColStatsForTablePartitions(dbName, tableName); + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColStatsForDatabase(dbName); } public RawStore getRawStore() { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 80b17e0..e4ddcb3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -28,8 +28,10 @@ import java.util.TreeMap; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -58,8 +60,24 @@ new TreeMap(); private static Map sdCache = new HashMap(); + private static Map> aggrColStatsCache = + new HashMap>(); private static MessageDigest md; + static enum StatsType { + ALL(0), ALLBUTDEFAULT(1); + + private final int position; + + private StatsType(int position) { + this.position = position; + } + + public int getPosition() { + return position; + } + } + static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); static { @@ -155,7 +173,11 @@ public static synchronized void removeTableColStatsFromCache(String dbName, Stri public static synchronized void removeTableColStatsFromCache(String dbName, String tblName, String colName) { - tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + if (colName == null) { + removeTableColStatsFromCache(dbName, tblName); + } else { + tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + } } public static synchronized void updateTableColStatsInCache(String dbName, String tableName, @@ -210,7 +232,8 @@ public static synchronized void alterTableInTableColStatsCache(String dbName, St ColumnStatisticsObj colStatObj = entry.getValue(); if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { String[] decomposedKey = CacheUtils.splitTableColStats(key); - String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); + String newKey = + CacheUtils.buildKey(decomposedKey[0], newTable.getTableName(), decomposedKey[2]); newTableColStats.put(newKey, colStatObj); iterator.remove(); } @@ -236,8 +259,9 @@ public static synchronized void alterTableInPartitionColStatsCache(String dbName ColumnStatisticsObj colStatObj = entry.getValue(); if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + // New key has the new table name String newKey = - CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], + CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), (List) decomposedKey[2], (String) decomposedKey[3]); newPartitionColStats.put(newKey, colStatObj); iterator.remove(); @@ -248,6 +272,32 @@ public static synchronized void alterTableInPartitionColStatsCache(String dbName } } + public static synchronized void alterTableInAggrPartitionColStatsCache(String dbName, + String tblName, Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + Map> newAggrColStatsCache = + new HashMap>(); + String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator>> iterator = + aggrColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + String key = entry.getKey(); + List value = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitAggrColStats(key); + // New key has the new table name + String newKey = + CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), + (String) decomposedKey[2]); + newAggrColStatsCache.put(newKey, value); + iterator.remove(); + } + } + aggrColStatsCache.putAll(newAggrColStatsCache); + } + } + public static synchronized int getCachedTableCount() { return tableCache.size(); } @@ -323,6 +373,20 @@ public static synchronized Partition removePartitionFromCache(String dbName, Str return wrapper.getPartition(); } + // Remove cached column stats for all partitions of all tables in a db + public static synchronized void removePartitionColStatsFromCache(String dbName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + // Remove cached column stats for all partitions of a table public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); @@ -429,28 +493,29 @@ public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; } - public static synchronized void addPartitionColStatsToCache(String dbName, String tableName, - Map> colStatsPerPartition) { - for (Map.Entry> entry : colStatsPerPartition.entrySet()) { - String partName = entry.getKey(); + public static synchronized void addPartitionColStatsToCache( + List colStatsForDB) { + for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) { + List partVals; try { - List partVals = Warehouse.getPartValuesFromPartName(partName); - for (ColumnStatisticsObj colStatObj : entry.getValue()) { - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - partitionColStatsCache.put(key, colStatObj); - } + partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName()); + ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj(); + String key = + CacheUtils.buildKey(colStatWithSourceInfo.getDbName(), + colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); } catch (MetaException e) { - LOG.info("Unable to add partition: " + partName + " to SharedCache", e); + LOG.info("Unable to add partition stats for: {} to SharedCache", + colStatWithSourceInfo.getPartName(), e); } } } - public static synchronized void refreshPartitionColStats(String dbName, String tableName, - Map> newColStatsPerPartition) { - LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName - + " and table: " + tableName); - removePartitionColStatsFromCache(dbName, tableName); - addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); + public static synchronized void refreshPartitionColStats(String dbName, + List colStatsForDB) { + LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName); + removePartitionColStatsFromCache(dbName); + addPartitionColStatsToCache(colStatsForDB); } public static synchronized void addTableColStatsToCache(String dbName, String tableName, @@ -471,6 +536,70 @@ public static synchronized void refreshTableColStats(String dbName, String table addTableColStatsToCache(dbName, tableName, colStatsForTable); } + public static synchronized void addAggregateStatsToCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + if (aggrStatsAllPartitions != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = new ArrayList(); + value.add(StatsType.ALL.getPosition(), colStatObj); + aggrColStatsCache.put(key, value); + } + } + if (aggrStatsAllButDefaultPartition != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = aggrColStatsCache.get(key); + if ((value != null) && (value.size() > 0)) { + value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj); + } + } + } + } + + public static synchronized List getAggrStatsFromCache(String dbName, + String tblName, List colNames, StatsType statsType) { + List colStats = new ArrayList(); + for (String colName : colNames) { + String key = CacheUtils.buildKey(dbName, tblName, colName); + List colStatList = aggrColStatsCache.get(key); + // If unable to find stats for a column, return null so we can build stats + if (colStatList == null) { + return null; + } + ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); + // If unable to find stats for this StatsType, return null so we can build + // stats + if (colStatObj == null) { + return null; + } + colStats.add(colStatObj); + } + return colStats; + } + + public static synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator>> iterator = + aggrColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public static synchronized void refreshAggregateStatsCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName, + tblName); + removeAggrPartitionColStatsFromCache(dbName, tblName); + addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + } + public static void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); if (sdCache.containsKey(byteArray)) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java index e6c836b..3d845ba 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java @@ -21,28 +21,26 @@ import java.util.List; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; + public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BinaryColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BinaryColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); @@ -61,4 +59,4 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, statsObj.setStatsData(columnStatisticsData); return statsObj; } -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java index a34bc9f..4afd0e8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java @@ -21,8 +21,8 @@ import java.util.List; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -30,22 +30,21 @@ public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BooleanColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BooleanColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + colType = cso.getColType(); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); } BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats(); if (aggregateData == null) { @@ -61,5 +60,4 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, statsObj.setStatsData(columnStatisticsData); return statsObj; } - } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java index a52e5e5..81e5c35 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java @@ -21,13 +21,15 @@ import java.util.List; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; public abstract class ColumnStatsAggregator { public boolean useDensityFunctionForNDVEstimation; public double ndvTuner; - public abstract ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException; + + public abstract ColumnStatisticsObj aggregate( + List colStatsWithSourceInfo, List partNames, + boolean areAllPartsFound) throws MetaException; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java index ee95396..cc36336 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Date; @@ -44,27 +44,24 @@ private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); + LOG.trace("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); } DateColumnStatsDataInspector dateColumnStats = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); @@ -87,24 +84,24 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DateColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); - densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) - / newData.getNumDVs(); + densityAvgSum += + (diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs(); if (ndvEstimator != null) { ndvEstimator.mergeEstimators(newData.getNdvEstimator()); } @@ -112,8 +109,7 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, aggregateData = newData.deepCopy(); } else { aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData - .setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); } @@ -129,7 +125,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, // We have estimation, lowerbound and higherbound. We use estimation // if it is between lowerbound and higherbound. double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) (diff(aggregateData.getHighValue(), aggregateData.getLowValue()) / densityAvg); + estimation = + (long) (diff(aggregateData.getHighValue(), aggregateData.getLowValue()) / densityAvg); if (estimation < lowerBound) { estimation = lowerBound; } else if (estimation > higherBound) { @@ -150,19 +147,21 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); // while we scan the css, we also get the densityAvg, lowerbound and // higerbound when useDensityFunctionForNDVEstimation is true. double densityAvgSum = 0.0; if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DateColumnStatsData newData = cso.getStatsData().getDateStats(); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); + densityAvgSum += + diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); } adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); @@ -175,9 +174,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DateColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); // newData.isSetBitVectors() should be true for sure because we @@ -192,14 +191,16 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDateStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) - / aggregateData.getNumDVs(); + densityAvgSum += + diff(aggregateData.getHighValue(), aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -225,16 +226,19 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDateStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue()) - / aggregateData.getNumDVs(); + densityAvgSum += + diff(aggregateData.getHighValue(), aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getDateStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -261,8 +265,8 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDateStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>(extractedAdjustedStatsMap.entrySet()); // get the lowValue Collections.sort(list, new Comparator>() { @Override @@ -359,4 +363,4 @@ public int compare(Map.Entry o1, extrapolateDateData.setNumDVs(ndv); extrapolateData.setDateStats(extrapolateDateData); } -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java index 284c12c..76d142a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; @@ -45,27 +45,24 @@ private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); + LOG.trace("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); } DecimalColumnStatsDataInspector decimalColumnStatsData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); @@ -88,24 +85,25 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DecimalColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DecimalColumnStatsDataInspector newData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); if (ndvEstimator != null) { ndvEstimator.mergeEstimators(newData.getNdvEstimator()); } @@ -139,8 +137,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, // We have estimation, lowerbound and higherbound. We use estimation // if it is between lowerbound and higherbound. double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / densityAvg); + estimation = + (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / densityAvg); if (estimation < lowerBound) { estimation = lowerBound; } else if (estimation > higherBound) { @@ -160,20 +159,22 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); // while we scan the css, we also get the densityAvg, lowerbound and // higerbound when useDensityFunctionForNDVEstimation is true. double densityAvgSum = 0.0; if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); } adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); @@ -186,9 +187,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DecimalColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DecimalColumnStatsDataInspector newData = (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); // newData.isSetBitVectors() should be true for sure because we @@ -203,14 +204,16 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDecimalStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -246,16 +249,19 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDecimalStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getDecimalStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -270,8 +276,9 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>( + extractedAdjustedStatsMap.entrySet()); // get the lowValue Collections.sort(list, new Comparator>() { @Override @@ -372,4 +379,4 @@ public int compare(Map.Entry o1, extrapolateDecimalData.setNumDVs(ndv); extrapolateData.setDecimalStats(extrapolateDecimalData); } -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java index bb4a725..0ea5789 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; @@ -43,27 +43,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); + LOG.trace("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); } DoubleColumnStatsDataInspector doubleColumnStatsData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); @@ -86,18 +83,18 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { DoubleColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); DoubleColumnStatsDataInspector newData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -127,7 +124,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, // We have estimation, lowerbound and higherbound. We use estimation // if it is between lowerbound and higherbound. double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); + estimation = + (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); if (estimation < lowerBound) { estimation = lowerBound; } else if (estimation > higherBound) { @@ -147,16 +145,17 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); // while we scan the css, we also get the densityAvg, lowerbound and // higerbound when useDensityFunctionForNDVEstimation is true. double densityAvgSum = 0.0; if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -172,9 +171,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DoubleColumnStatsData aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DoubleColumnStatsDataInspector newData = (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); // newData.isSetBitVectors() should be true for sure because we @@ -189,13 +188,16 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDoubleStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += + (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -222,15 +224,19 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDoubleStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += + (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {}. # of partitions requested: {}. # of partitions found: {}", colName, - columnStatisticsData.getDoubleStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {}. # of partitions requested: {}. # of partitions found: {}", + colName, columnStatisticsData.getDoubleStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -245,8 +251,9 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>( + extractedAdjustedStatsMap.entrySet()); // get the lowValue Collections.sort(list, new Comparator>() { @Override @@ -346,4 +353,4 @@ public int compare(Map.Entry o1, extrapolateData.setDoubleStats(extrapolateDoubleData); } -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java index 5b1145e..498331e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -43,27 +43,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); + LOG.trace("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); } LongColumnStatsDataInspector longColumnStatsData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); @@ -86,18 +83,18 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { LongColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); LongColumnStatsDataInspector newData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); @@ -127,7 +124,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, // We have estimation, lowerbound and higherbound. We use estimation // if it is between lowerbound and higherbound. double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); + estimation = + (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); if (estimation < lowerBound) { estimation = lowerBound; } else if (estimation > higherBound) { @@ -148,16 +146,17 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); // while we scan the css, we also get the densityAvg, lowerbound and // higerbound when useDensityFunctionForNDVEstimation is true. double densityAvgSum = 0.0; if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); LongColumnStatsData newData = cso.getStatsData().getLongStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -173,9 +172,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; LongColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); LongColumnStatsDataInspector newData = (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); // newData.isSetBitVectors() should be true for sure because we @@ -190,13 +189,16 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setLongStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += + (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -223,15 +225,19 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setLongStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += + (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getLongStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getLongStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -246,8 +252,8 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>(extractedAdjustedStatsMap.entrySet()); // get the lowValue Collections.sort(list, new Comparator>() { @Override @@ -345,4 +351,4 @@ public int compare(Map.Entry o1, extrapolateData.setLongStats(extrapolateLongData); } -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java index 1b29f92..a7cce91 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -43,28 +43,24 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithSourceInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - + boolean doAllPartitionContainStats = partNames.size() == colStatsWithSourceInfo.size(); + String colType = null; + String colName = null; // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors. Only when both of the conditions are true, we merge bit - // vectors. Otherwise, just use the maximum function. - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); + // bitvectors NumDistinctValueEstimator ndvEstimator = null; - String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); + LOG.trace("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); } StringColumnStatsDataInspector stringColumnStatsData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); @@ -87,15 +83,15 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (doAllPartitionContainStats || colStatsWithSourceInfo.size() < 2) { StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); if (ndvEstimator != null) { @@ -130,13 +126,14 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); } @@ -148,9 +145,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); // newData.isSetBitVectors() should be true for sure because we @@ -168,8 +165,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -198,11 +195,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, adjustedStatsMap.put(pseudoPartName.toString(), csd); } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, -1); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithSourceInfo.size(), + adjustedIndexMap, adjustedStatsMap, -1); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getStringStats().getNumDVs(),partNames.size(), css.size()); + LOG.debug( + "Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", + colName, columnStatisticsData.getStringStats().getNumDVs(), partNames.size(), + colStatsWithSourceInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -217,8 +216,9 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getStringStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>( + extractedAdjustedStatsMap.entrySet()); // get the avgLen Collections.sort(list, new Comparator>() { @Override @@ -279,7 +279,7 @@ public int compare(Map.Entry o1, @Override public int compare(Map.Entry o1, Map.Entry o2) { - return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); + return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); } }); minInd = adjustedIndexMap.get(list.get(0).getKey()); @@ -302,4 +302,4 @@ public int compare(Map.Entry o1, extrapolateData.setStringStats(extrapolateStringData); } -} +} \ No newline at end of file diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index a75dbb0..8628575 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -923,14 +925,14 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + public String getMetastoreDbUuid() throws MetaException { + throw new MetaException("Get metastore uuid is not implemented"); } @Override - public String getMetastoreDbUuid() throws MetaException { - throw new MetaException("Get metastore uuid is not implemented"); + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index bbb4bf1..e7a350f 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -939,14 +941,14 @@ public void dropConstraint(String dbName, String tableName, } @Override - public Map> getColStatsForTablePartitions(String dbName, - String tableName) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + public String getMetastoreDbUuid() throws MetaException { + throw new MetaException("Get metastore uuid is not implemented"); } @Override - public String getMetastoreDbUuid() throws MetaException { - throw new MetaException("Get metastore uuid is not implemented"); + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; } }