diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 8404e3ecd2..963b22706c 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -759,7 +759,7 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(), updateTableColumnStatEvent.getTableObj(), updateTableColumnStatEvent.getTableParameters(), - updateTableColumnStatEvent.getValidWriteIds(), updateTableColumnStatEvent.getWriteId()); + updateTableColumnStatEvent.getWriteId()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc(); @@ -789,7 +789,7 @@ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePar updatePartColStatEvent.getPartVals(), updatePartColStatEvent.getPartParameters(), updatePartColStatEvent.getTableObj(), - updatePartColStatEvent.getValidWriteIds(), updatePartColStatEvent.getWriteId()); + updatePartColStatEvent.getWriteId()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java index 83f12a5fd9..cdfc60c994 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java @@ -4,6 +4,8 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.*; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; @@ -12,7 +14,10 @@ import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.junit.Assert; @@ -29,6 +34,7 @@ private SharedCache sharedCache; private Configuration conf; private HiveMetaStore.HMSHandler hmsHandler; + private String[] colType = new String[] {"double", "string"}; @Before public void setUp() throws Exception { @@ -39,11 +45,14 @@ public void setUp() throws Exception { MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore"); MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true); + MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true); + MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false); MetaStoreTestUtils.setConfForStandloneMode(conf); hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true); - rawStore = hmsHandler.getMS(); + rawStore = new ObjectStore(); + rawStore.setConf(hmsHandler.getConf()); sharedCache = CachedStore.getSharedCache(); // Stop the CachedStore cache update service. We'll start it explicitly to control the test @@ -69,10 +78,14 @@ private Table createTestTblParam(String dbName, String tblName, String tblOwner, String serdeLocation = "file:/tmp"; Map serdeParams = new HashMap<>(); SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); - StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, + StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, + null, null, false, 3, serdeInfo, null, null, serdeParams); + sd.setInputFormat(OrcInputFormat.class.getName()); + sd.setOutputFormat(OrcOutputFormat.class.getName()); sd.setStoredAsSubDirectories(false); - Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, + null, null, TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); return tbl; @@ -186,6 +199,10 @@ public void testTableOpsForUpdateUsingEvents() throws Exception { long lastEventId = -1; RawStore rawStore = hmsHandler.getMS(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + // Add a db via rawStore String dbName = "test_table_ops"; String dbOwner = "user1"; @@ -193,10 +210,6 @@ public void testTableOpsForUpdateUsingEvents() throws Exception { hmsHandler.create_database(db); db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - // Prewarm CachedStore - CachedStore.setCachePrewarmedState(false); - CachedStore.prewarm(rawStore); - // Add a table via rawStore String tblName = "tbl"; String tblOwner = "user1"; @@ -263,6 +276,10 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { long lastEventId = -1; RawStore rawStore = hmsHandler.getMS(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + // Add a db via rawStore String dbName = "test_partition_ops"; String dbOwner = "user1"; @@ -285,21 +302,19 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { hmsHandler.create_table(tbl); tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - // Prewarm CachedStore - CachedStore.setCachePrewarmedState(false); - CachedStore.prewarm(rawStore); - final String ptnColVal1 = "aaa"; Map partParams = new HashMap(); Partition ptn1 = - new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams); + new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, + 0, tbl.getSd(), partParams); ptn1.setCatName(DEFAULT_CATALOG_NAME); hmsHandler.add_partition(ptn1); ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); final String ptnColVal2 = "bbb"; Partition ptn2 = - new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams); + new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, + 0, tbl.getSd(), partParams); ptn2.setCatName(DEFAULT_CATALOG_NAME); hmsHandler.add_partition(ptn2); ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); @@ -307,17 +322,21 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { // Read database, table, partition via CachedStore Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase()); Assert.assertEquals(db, dbRead); - Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase()); + Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), + dbName.toLowerCase(), tblName.toLowerCase()); compareTables(tbl, tblRead); - Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1)); + Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), + dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1)); comparePartitions(ptn1, ptn1Read); - Partition ptn2Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2)); + Partition ptn2Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), + dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2)); comparePartitions(ptn2, ptn2Read); // Add a new partition via rawStore final String ptnColVal3 = "ccc"; Partition ptn3 = - new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams); + new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, + 0, tbl.getSd(), partParams); ptn3.setCatName(DEFAULT_CATALOG_NAME); hmsHandler.add_partition(ptn3); ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); @@ -326,7 +345,8 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); final String ptnColVal1Alt = "aaa"; Partition ptn1Atl = - new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams); + new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, + 0, tbl.getSd(), partParams); ptn1Atl.setCatName(DEFAULT_CATALOG_NAME); hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl)); ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); @@ -336,7 +356,8 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), false); // Read the newly added partition via CachedStore - Partition ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); + Partition ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, + tblName, Arrays.asList(ptnColVal3)); comparePartitions(ptn3, ptnRead); // Read the altered partition via CachedStore @@ -362,49 +383,116 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { sharedCache.getSdCache().clear(); } - @Test - public void testTableColumnStatistics() throws Throwable { - String dbName = "column_stats_test_db"; - String tblName = "tbl"; - String typeName = "person"; - String tblOwner = "testowner"; - int lastAccessed = 6796; - String dbOwner = "user1"; + private void updateTableColStats(String dbName, String tblName, String[] colName, + double highValue, double avgColLen, boolean isTxnTable) throws Throwable { + long writeId = -1; + String validWriteIds = null; + if (isTxnTable) { + writeId = allocateWriteIds(allocateTxns(1), dbName, tblName).get(0).getWriteId(); + validWriteIds = getValidWriteIds(dbName, tblName); + } - // Add a db via rawStore - Database db = createTestDb(dbName, dbOwner); - hmsHandler.create_database(db); - db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setIsTblLevel(true); + statsDesc.setPartName(null); - Map tableParams = new HashMap<>(); - tableParams.put("test_param_1", "hi"); - tableParams.put("test_param_2", "50"); + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen)); - // Add a table via rawStore - List cols = new ArrayList(); - cols.add(new FieldSchema("income", "int", "integer column")); - cols.add(new FieldSchema("name", "string", "string column")); + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); - List ptnCols = new ArrayList(); - ptnCols.add(new FieldSchema("ds", "string", "string partition column")); - ptnCols.add(new FieldSchema("hr", "int", "integer partition column")); + // write stats objs persistently + hmsHandler.update_table_column_statistics_req(setTblColStat); + validateTablePara(dbName, tblName); + + ColumnStatistics colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Lists.newArrayList(colName[0]), validWriteIds, true); + Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[0]); + verifyStatDouble(colStatsCache.getStatsObj().get(0), colName[0], highValue); + + colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, true); + Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[1]); + verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], avgColLen); + } - Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, tableParams); - hmsHandler.create_table(tbl); + private void updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName, + String partName, double highValue, double avgColLen) throws Throwable { + long writeId = -1; + String validWriteIds = null; + List txnIds = null; - // Prewarm CachedStore - CachedStore.setCachePrewarmedState(false); - CachedStore.prewarm(rawStore); + if (isTxnTable) { + txnIds = allocateTxns(1); + writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + validWriteIds = getValidWriteIds(dbName, tblName); + } - // Create a ColumnStatistics Obj - String[] colName = new String[]{"income", "name"}; + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setIsTblLevel(false); + statsDesc.setPartName(partName); + + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen)); + + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); + + // write stats objs persistently + hmsHandler.update_partition_column_statistics_req(setTblColStat); + + if (isTxnTable) { + CommitTxnRequest rqst = new CommitTxnRequest(txnIds.get(0)); + hmsHandler.commit_txn(rqst); + writeId = allocateWriteIds(allocateTxns(1), dbName, tblName).get(0).getWriteId(); + validWriteIds = getValidWriteIds(dbName, tblName); + } + + Deadline.startTimer("getPartitionColumnStatistics"); + List statRowStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName, + Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds); + Deadline.stopTimer(); + verifyStatString(statRowStore.get(0).getStatsObj().get(0), colName[1], avgColLen); + if (isTxnTable) { + Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), true); + } else { + Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false); + } + + List statSharedCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]), + validWriteIds, true); + verifyStatString(statSharedCache.get(0).getStatsObj().get(0), colName[1], avgColLen); + if (isTxnTable) { + Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), true); + } else { + Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), false); + } + + SharedCache.ColumStatsWithWriteId statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, CachedStore.partNameToVals(partName), colName[0], validWriteIds); + verifyStatDouble(statPartCache.getColumnStatisticsObj(), colName[0], highValue); + + statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + CachedStore.partNameToVals(partName), colName[1], validWriteIds); + verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], avgColLen); + } + + private List getStatsObjects(String dbName, String tblName, String[] colName, + double highValue, double avgColLen) throws Throwable { double lowValue = 50000.21; - double highValue = 1200000.4525; long numNulls = 3; long numDVs = 22; - double avgColLen = 50.30; long maxColLen = 102; - String[] colType = new String[] {"double", "string"}; boolean isTblLevel = true; String partName = null; List statsObjs = new ArrayList<>(); @@ -445,42 +533,96 @@ public void testTableColumnStatistics() throws Throwable { statsObj.setStatsData(statsData); statsObjs.add(statsObj); + return statsObjs; + } - ColumnStatistics colStats = new ColumnStatistics(); - colStats.setStatsDesc(statsDesc); - colStats.setStatsObj(statsObjs); + private void verifyStatDouble(ColumnStatisticsObj colStats, String colName, double highValue) { + double lowValue = 50000.21; + long numNulls = 3; + long numDVs = 22; + Assert.assertEquals(colStats.getColName(), colName); + Assert.assertEquals(colStats.getStatsData().getDoubleStats().getLowValue(), lowValue, 0.01); + Assert.assertEquals(colStats.getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); + Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumNulls(), numNulls); + Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumDVs(), numDVs); + } - // write stats objs persistently - hmsHandler.update_table_column_statistics(colStats); - - ColumnStatisticsObj colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, - dbName, tblName, Lists.newArrayList(colName[0])).get(0); - Assert.assertEquals(colStatsCache.getColName(), colName[0]); - Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getLowValue(), lowValue, 0.01); - Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); - Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumNulls(), numNulls); - Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumDVs(), numDVs); - - // test delete column stats; if no col name is passed all column stats associated with the - // table is deleted - boolean status = hmsHandler.delete_table_column_statistics(dbName, tblName, null); - Assert.assertEquals(status, true); + private void verifyStatString(ColumnStatisticsObj colStats, String colName, double avgColLen) { + long numNulls = 3; + long numDVs = 22; + long maxColLen = 102; + Assert.assertEquals(colStats.getColName(), colName); + Assert.assertEquals(colStats.getStatsData().getStringStats().getMaxColLen(), maxColLen); + Assert.assertEquals(colStats.getStatsData().getStringStats().getAvgColLen(), avgColLen, 0.01); + Assert.assertEquals(colStats.getStatsData().getStringStats().getNumNulls(), numNulls); + Assert.assertEquals(colStats.getStatsData().getStringStats().getNumDVs(), numDVs); + } - Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, - dbName, tblName, Lists.newArrayList(colName[0])).isEmpty(), true); + private void verifyStat(List colStats, String[] colName, double highValue, double avgColLen) { + //verifyStatDouble(colStats.get(0), colName[0], highValue); + verifyStatString(colStats.get(0), colName[1], avgColLen); + } - tblName = "tbl_part"; - cols = new ArrayList<>(); - cols.add(new FieldSchema(colName[0], "int", null)); + private void setUpBeforeTest(String dbName, String tblName, String[] colName, boolean isTxnTable) throws Throwable { + String dbOwner = "user1"; + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + + // Add a db via rawStore + Database db = createTestDb(dbName, dbOwner); + hmsHandler.create_database(db); + if (tblName != null) { + createTestTable(dbName, tblName, colName, isTxnTable); + } + } + + private void createTestTable(String dbName, String tblName, String[] colName, boolean isTxnTable) throws Throwable { + // Add a table via rawStore + List cols = new ArrayList(); + cols.add(new FieldSchema(colName[0], "int", "integer column")); + cols.add(new FieldSchema(colName[1], "string", "string column")); + + Map tableParams = new HashMap<>(); + tableParams.put("test_param_1", "hi"); + tableParams.put("test_param_2", "50"); + if (isTxnTable) { + tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + } + + String tblOwner = "testowner"; + + List ptnCols = new ArrayList(); + ptnCols.add(new FieldSchema("ds", "string", "string partition column")); + ptnCols.add(new FieldSchema("hr", "int", "integer partition column")); + + Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, tableParams); + hmsHandler.create_table(tbl); + } + + private void createTableWithPart(String dbName, String tblName, String[] colName, boolean isTxnTbl) throws Throwable { + List cols = new ArrayList<>(); + cols.add(new FieldSchema(colName[0], colType[0], null)); List partCols = new ArrayList<>(); - partCols.add(new FieldSchema("col", "int", null)); + partCols.add(new FieldSchema(colName[0], colType[0], null)); + Map tableParams = new HashMap<>(); + tableParams.put("test_param_1", "hi"); + tableParams.put("test_param_2", "50"); + if (isTxnTbl) { + tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + StatsSetupConst.setBasicStatsState(tableParams, StatsSetupConst.TRUE); + } StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, + new StorageDescriptor(cols, null, "orc", + "orc", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), - null, null, null); + null, null, tableParams); + sd.setInputFormat(OrcInputFormat.class.getName()); + sd.setOutputFormat(OrcOutputFormat.class.getName()); - tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, + partCols, tableParams, null, null, TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); hmsHandler.create_table(tbl); @@ -489,47 +631,405 @@ public void testTableColumnStatistics() throws Throwable { partVals1.add("1"); List partVals2 = new ArrayList<>(); partVals2.add("2"); + Map partParams = new HashMap<>(); + StatsSetupConst.setBasicStatsState(partParams, StatsSetupConst.TRUE); + EnvironmentContext environmentContext = new EnvironmentContext(); + environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK); Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); + new Partition(partVals1, dbName, tblName, 0, 0, sd, partParams); ptn1.setCatName(DEFAULT_CATALOG_NAME); - hmsHandler.add_partition(ptn1); + hmsHandler.add_partition_with_environment_context(ptn1, environmentContext); Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); + new Partition(partVals2, dbName, tblName, 0, 0, sd, partParams); ptn2.setCatName(DEFAULT_CATALOG_NAME); - hmsHandler.add_partition(ptn2); + hmsHandler.add_partition_with_environment_context(ptn2, environmentContext); + } + + private List allocateTxns(int numTxns) throws Throwable { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "user", "host"); + return hmsHandler.open_txns(openTxnRequest).getTxn_ids(); + } + + private List allocateWriteIds(List txnIds, String dbName, String tblName) throws Throwable { + AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new AllocateTableWriteIdsRequest(dbName, tblName); + allocateTableWriteIdsRequest.setTxnIds(txnIds); + return hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds(); + } + + private String getValidWriteIds(String dbName, String tblName) throws Throwable { + GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest( + Collections.singletonList(TableName.getDbTable(dbName, tblName))); + GetValidWriteIdsResponse validWriteIdsResponse = hmsHandler.get_valid_write_ids(validWriteIdsRequest); + return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse. + getTblValidWriteIds().get(0)).writeToString(); + } + + private void validateTablePara(String dbName, String tblName) throws Throwable { + Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); + Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName); + Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters()); + } + private void validatePartPara(String dbName, String tblName, String partName) throws Throwable { + //Partition part1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, partName); + //Partition part2 = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, partName); + //Assert.assertEquals(part1.getParameters(), part2.getParameters()); + } + + private void deleteColStats(String dbName, String tblName, String[] colName) throws Throwable { + boolean status = hmsHandler.delete_table_column_statistics(dbName, tblName, null); + Assert.assertEquals(status, true); + Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + Lists.newArrayList(colName[0]), null, true).getStatsObj().isEmpty(), true); + Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + Lists.newArrayList(colName[1]), null, true).getStatsObj().isEmpty(), true); + validateTablePara(dbName, tblName); + } + + private void deletePartColStats(String dbName, String tblName, String[] colName, + String partName) throws Throwable { + boolean status = hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, colName[1]); + Assert.assertEquals(status, true); + + SharedCache.ColumStatsWithWriteId colStats = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, + tblName, CachedStore.partNameToVals(partName), colName[1], null); + Assert.assertEquals(colStats.getColumnStatisticsObj(), null); + validateTablePara(dbName, tblName); + } + + private void testTableColStatInternal(String dbName, String tblName, boolean isTxnTable) throws Throwable { + String[] colName = new String[]{"income", "name"}; + double highValue = 1200000.4525; + double avgColLen = 50.30; + + setUpBeforeTest(dbName, tblName, colName, isTxnTable); + updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable); + if (!isTxnTable) { + deleteColStats(dbName, tblName, colName); + } + + tblName = "tbl_part"; + createTableWithPart(dbName, tblName, colName, isTxnTable); List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); - partName = partitions.get(0); - isTblLevel = false; + String partName = partitions.get(0); + updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen); + if (!isTxnTable) { + deletePartColStats(dbName, tblName, colName, partName); + } + } + + @Test + public void testTableColumnStatistics() throws Throwable { + String dbName = "column_stats_test_db"; + String tblName = "tbl"; + testTableColStatInternal(dbName, tblName, false); + } + + @Test + public void testTableColumnStatisticsTxnTable() throws Throwable { + String dbName = "column_stats_test_db_txn"; + String tblName = "tbl_txn"; + testTableColStatInternal(dbName, tblName, true); + } + + @Test + public void testTableColumnStatisticsTxnTableMulti() throws Throwable { + String dbName = "column_stats_test_db_txn_multi"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; + double highValue = 1200000.4525; + double avgColLen = 50.30; + + setUpBeforeTest(dbName, null, colName, true); + createTableWithPart(dbName, tblName, colName, true); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + String partName = partitions.get(0); + updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen); + updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen); + updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78); + } + + @Test + public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable { + String dbName = "column_stats_test_db_txn_multi_abort"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; + double highValue = 1200000.4525; + double avgColLen = 50.30; + + setUpBeforeTest(dbName, null, colName, true); + createTableWithPart(dbName, tblName, colName, true); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + String partName = partitions.get(0); + + List txnIds = allocateTxns(1); + long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + String validWriteIds = getValidWriteIds(dbName, tblName); // create a new columnstatistics desc to represent partition level column stats - statsDesc = new ColumnStatisticsDesc(); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); statsDesc.setDbName(dbName); statsDesc.setTableName(tblName); statsDesc.setPartName(partName); - statsDesc.setIsTblLevel(isTblLevel); + statsDesc.setIsTblLevel(false); + + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen)); + + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); + + // write stats objs persistently + hmsHandler.update_partition_column_statistics_req(setTblColStat); + + // abort the txn and verify that the stats got is not compliant. + AbortTxnRequest rqst = new AbortTxnRequest(txnIds.get(0)); + hmsHandler.abort_txn(rqst); + + allocateWriteIds(allocateTxns(1), dbName, tblName); + validWriteIds = getValidWriteIds(dbName, tblName); + + Deadline.startTimer("getPartitionColumnStatistics"); + List statRawStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName, + Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds); + Deadline.stopTimer(); + + verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen); + Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false); + + List statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]), + validWriteIds, true); + verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, avgColLen); + Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false); + + SharedCache.ColumStatsWithWriteId columStatsWithWriteId = + sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + CachedStore.partNameToVals(partName), colName[1], validWriteIds); + Assert.assertEquals(columStatsWithWriteId, null); + validatePartPara(dbName, tblName, partName); + } + + @Test + public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable { + String dbName = "column_stats_test_db_txn_multi_open"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; + double highValue = 1200000.4121; + double avgColLen = 23.30; + + setUpBeforeTest(dbName, null, colName, true); + createTableWithPart(dbName, tblName, colName, true); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + String partName = partitions.get(0); + + // update part col stats successfully. + updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2); + + List txnIds = allocateTxns(1); + long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + String validWriteIds = getValidWriteIds(dbName, tblName); + + // create a new columnstatistics desc to represent partition level column stats + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setPartName(partName); + statsDesc.setIsTblLevel(false); + + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen)); + + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); + + // write stats objs persistently + hmsHandler.update_partition_column_statistics_req(setTblColStat); + + // keep the txn open and verify that the stats got is not compliant. + + allocateWriteIds(allocateTxns(1), dbName, tblName); + validWriteIds = getValidWriteIds(dbName, tblName); + + Deadline.startTimer("getPartitionColumnStatistics"); + List statRawStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName, + Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds); + Deadline.stopTimer(); + + verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen); + Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false); + + List statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]), + validWriteIds, true); + verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, avgColLen); + Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false); + + SharedCache.ColumStatsWithWriteId columStatsWithWriteId = + sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, + tblName, CachedStore.partNameToVals(partName), colName[1], validWriteIds); + Assert.assertEquals(columStatsWithWriteId, null); + validatePartPara(dbName, tblName, partName); + } + + private void verifyAggrStat(String dbName, String tblName, String[] colName, List partitions, + boolean isTxnTbl, double highValue) throws Throwable { + List txnIds = allocateTxns(1); + allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + String validWriteIds = getValidWriteIds(dbName, tblName); + + Deadline.startTimer("getPartitionSpecsByFilterAndProjection"); + AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions, + Collections.singletonList(colName[0]), validWriteIds); + Deadline.stopTimer(); + Assert.assertEquals(aggrStats.getPartsFound(), 2); + Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); + //Assert.assertEquals(aggrStats.isIsStatsCompliant(), true); + + // This will update the cache for non txn table. + PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName, + Collections.singletonList(colName[0]), partitions); + request.setCatName(DEFAULT_CATALOG_NAME); + request.setValidWriteIdList(validWriteIds); + AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request); + Assert.assertEquals(aggrStatsCached, aggrStats); + //Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true); + + List stats = sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + Collections.singletonList(colName[0]), SharedCache.StatsType.ALL); + Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); + } + + @Test + public void testAggrStat() throws Throwable { + String dbName = "aggr_stats_test"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; + + setUpBeforeTest(dbName, null, colName, false); + createTableWithPart(dbName, tblName, colName, false); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1); + String partName = partitions.get(0); + + // update part col stats successfully. + updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12); + updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10); + verifyAggrStat(dbName, tblName, colName, partitions, false, 4); + + updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10); + verifyAggrStat(dbName, tblName, colName, partitions, false, 3); + } + + @Test + public void testAggrStatTxnTable() throws Throwable { + String dbName = "aggr_stats_test_db_txn"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; - colStats = new ColumnStatistics(); + setUpBeforeTest(dbName, null, colName, true); + createTableWithPart(dbName, tblName, colName, true); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + String partName = partitions.get(0); + + // update part col stats successfully. + updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12); + updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10); + verifyAggrStat(dbName, tblName, colName, partitions, true, 4); + + updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10); + verifyAggrStat(dbName, tblName, colName, partitions, true, 3); + + List txnIds = allocateTxns(1); + long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + String validWriteIds = getValidWriteIds(dbName, tblName); + + // create a new columnstatistics desc to represent partition level column stats + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setPartName(partName); + statsDesc.setIsTblLevel(false); + + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20)); + + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); + hmsHandler.update_partition_column_statistics_req(setTblColStat); + + Deadline.startTimer("getPartitionSpecsByFilterAndProjection"); + AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions, + Collections.singletonList(colName[0]), validWriteIds); + Deadline.stopTimer(); + Assert.assertEquals(aggrStats, null); + + // keep the txn open and verify that the stats got is not compliant. + PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName, + Collections.singletonList(colName[0]), partitions); + request.setCatName(DEFAULT_CATALOG_NAME); + request.setValidWriteIdList(validWriteIds); + AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request); + Assert.assertEquals(aggrStatsCached, null); + } + + @Test + public void testAggrStatAbortTxn() throws Throwable { + String dbName = "aggr_stats_test_db_txn_abort"; + String tblName = "tbl_part"; + String[] colName = new String[]{"income", "name"}; + + setUpBeforeTest(dbName, null, colName, true); + createTableWithPart(dbName, tblName, colName, true); + List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + String partName = partitions.get(0); + + // update part col stats successfully. + updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12); + updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10); + verifyAggrStat(dbName, tblName, colName, partitions, true, 4); + + List txnIds = allocateTxns(4); + long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); + String validWriteIds = getValidWriteIds(dbName, tblName); + + // create a new columnstatistics desc to represent partition level column stats + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setPartName(partName); + statsDesc.setIsTblLevel(false); + + ColumnStatistics colStats = new ColumnStatistics(); colStats.setStatsDesc(statsDesc); - colStats.setStatsObj(statsObjs); - - hmsHandler.update_partition_column_statistics(colStats); - ColumnStatisticsObj colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, - CachedStore.partNameToVals(partName), colName[1]); - // compare stats obj to ensure what we get is what we wrote - Assert.assertEquals(colStats.getStatsDesc().getPartName(), partName); - Assert.assertEquals(colStats2.getColName(), colName[1]); - Assert.assertEquals(colStats2.getStatsData().getStringStats().getMaxColLen(), maxColLen); - Assert.assertEquals(colStats2.getStatsData().getStringStats().getAvgColLen(), avgColLen, 0.01); - Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumNulls(), numNulls); - Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumDVs(), numDVs); - - // test stats deletion at partition level - hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, colName[1]); - - colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, - CachedStore.partNameToVals(partName), colName[1]); - Assert.assertEquals(colStats2, null); + colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20)); + + SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats)); + setTblColStat.setWriteId(writeId); + setTblColStat.setValidWriteIdList(validWriteIds); + hmsHandler.update_partition_column_statistics_req(setTblColStat); + + AbortTxnRequest abortTxnRequest = new AbortTxnRequest(txnIds.get(0)); + hmsHandler.abort_txn(abortTxnRequest); + + Deadline.startTimer("getPartitionSpecsByFilterAndProjection"); + AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions, + Collections.singletonList(colName[0]), validWriteIds); + Deadline.stopTimer(); + Assert.assertEquals(aggrStats, null); + + // keep the txn open and verify that the stats got is not compliant. + PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName, + Collections.singletonList(colName[0]), partitions); + request.setCatName(DEFAULT_CATALOG_NAME); + request.setValidWriteIdList(validWriteIds); + AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request); + Assert.assertEquals(aggrStatsCached, null); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index c028e12be0..7c1944fcca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -448,7 +448,7 @@ private String buildPartColStr(Table table) { } // TODO: we should probably skip updating if writeId is from an active txn boolean isTxnValid = (writeIdString == null) || ObjectStore.isCurrentStatsValidForTheQuery( - conf, params, statsWriteId , writeIdString, false); + params, statsWriteId, writeIdString, false); return getExistingStatsToUpdate(existingStats, params, isTxnValid); } @@ -473,7 +473,7 @@ private String buildPartColStr(Table table) { } // TODO: we should probably skip updating if writeId is from an active txn if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery( - conf, params, statsWriteId, writeIdString, false)) { + params, statsWriteId, writeIdString, false)) { return allCols; } List colsToUpdate = new ArrayList<>(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index b43fb5e6f0..4472f99736 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3684,7 +3684,7 @@ private boolean validatePartition(final Partition part, final String catName, try { boolean madeDir = createLocationForAddedPartition(table, partition); addedParts.put(new PartValEqWrapperLite(partition), madeDir); - initializeAddedPartition(table, partition, madeDir); + initializeAddedPartition(table, partition, madeDir, null); } catch (MetaException e) { throw new IOException(e.getMessage(), e); } @@ -3973,15 +3973,18 @@ private boolean canUpdateStats(Table tbl) { return true; } - private void initializeAddedPartition( - final Table tbl, final Partition part, boolean madeDir) throws MetaException { - initializeAddedPartition(tbl, new PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir); + private void initializeAddedPartition(final Table tbl, final Partition part, boolean madeDir, + EnvironmentContext environmentContext) throws MetaException { + initializeAddedPartition(tbl, + new PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir, environmentContext); } private void initializeAddedPartition( - final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir) throws MetaException { + final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir, + EnvironmentContext environmentContext) throws MetaException { if (canUpdateStats(tbl)) { - MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, null, true); + MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, + false, environmentContext, true); } // set create time @@ -4046,7 +4049,7 @@ private Partition add_partition_core(final RawStore ms, assert shouldAdd; // start would throw if it already existed here boolean madeDir = createLocationForAddedPartition(tbl, part); try { - initializeAddedPartition(tbl, part, madeDir); + initializeAddedPartition(tbl, part, madeDir, envContext); initializePartitionParameters(tbl, part); success = ms.addPartition(part); } finally { @@ -5989,13 +5992,13 @@ private boolean updateTableColumnStatsInternal(ColumnStatistics colStats, if (transactionalListeners != null && !transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.UPDATE_TABLE_COLUMN_STAT, - new UpdateTableColumnStatEvent(colStats, tableObj, parameters, validWriteIds, + new UpdateTableColumnStatEvent(colStats, tableObj, parameters, writeId, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.UPDATE_TABLE_COLUMN_STAT, - new UpdateTableColumnStatEvent(colStats, tableObj, parameters, validWriteIds, + new UpdateTableColumnStatEvent(colStats, tableObj, parameters, writeId,this)); } } @@ -6055,13 +6058,13 @@ private boolean updatePartitonColStatsInternal(Table tbl, ColumnStatistics colSt if (transactionalListeners != null && !transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.UPDATE_PARTITION_COLUMN_STAT, - new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, validWriteIds, + new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, writeId, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.UPDATE_PARTITION_COLUMN_STAT, - new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, validWriteIds, + new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, writeId, this)); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 0485fe9eeb..c0bae3b2bb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -8629,7 +8629,7 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, // Make sure we set the flag to invalid regardless of the current value. StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " - + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName()); + + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName()); } mPartition.setWriteId(writeId); } @@ -12479,7 +12479,7 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { */ private boolean isCurrentStatsValidForTheQuery(MTable tbl, String queryValidWriteIdList, boolean isCompleteStatsWriter) throws MetaException { - return isCurrentStatsValidForTheQuery(conf, tbl.getParameters(), tbl.getWriteId(), + return isCurrentStatsValidForTheQuery(tbl.getParameters(), tbl.getWriteId(), queryValidWriteIdList, isCompleteStatsWriter); } @@ -12499,19 +12499,19 @@ private boolean isCurrentStatsValidForTheQuery(MTable tbl, String queryValidWrit private boolean isCurrentStatsValidForTheQuery(MPartition part, String queryValidWriteIdList, boolean isCompleteStatsWriter) throws MetaException { - return isCurrentStatsValidForTheQuery(conf, part.getParameters(), part.getWriteId(), + return isCurrentStatsValidForTheQuery(part.getParameters(), part.getWriteId(), queryValidWriteIdList, isCompleteStatsWriter); } private boolean isCurrentStatsValidForTheQuery(Partition part, long partWriteId, String queryValidWriteIdList, boolean isCompleteStatsWriter) throws MetaException { - return isCurrentStatsValidForTheQuery(conf, part.getParameters(), partWriteId, + return isCurrentStatsValidForTheQuery(part.getParameters(), partWriteId, queryValidWriteIdList, isCompleteStatsWriter); } // TODO: move to somewhere else - public static boolean isCurrentStatsValidForTheQuery(Configuration conf, + public static boolean isCurrentStatsValidForTheQuery( Map statsParams, long statsWriteId, String queryValidWriteIdList, boolean isCompleteStatsWriter) throws MetaException { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 7ad4bd240b..182d5cca9c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.HiveAlterHandler; +import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; @@ -205,7 +206,7 @@ public static SharedCache getSharedCache() { return sharedCache; } - static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table before, String catalogName, + static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName, String dbName, String tableName, Partition part) throws Exception { ColumnStatistics colStats; List deletedCols = new ArrayList<>(); @@ -215,32 +216,31 @@ static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table befo sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column); } if (colStats != null) { - sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, part.getValues(), colStats.getStatsObj()); + sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(), + part.getValues(), part.getParameters(), colStats.getStatsObj()); } return colStats; } - static private void updateStatsForTable(RawStore rawStore, Table before, Table after, String catalogName, + static private void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName, String dbName, String tableName) throws Exception { ColumnStatistics colStats = null; List deletedCols = new ArrayList<>(); - if (before.isSetPartitionKeys()) { + if (tblBefore.isSetPartitionKeys()) { List parts = sharedCache.listCachedPartitions(catalogName, dbName, tableName, -1); for (Partition part : parts) { - colStats = updateStatsForPart(rawStore, before, catalogName, dbName, tableName, part); + colStats = updateStatsForAlterPart(rawStore, tblBefore, catalogName, dbName, tableName, part); } } - boolean needUpdateAggrStat = false; - List statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before, - after,null, null, rawStore.getConf(), deletedCols); + List statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore, + tblAfter,null, null, rawStore.getConf(), deletedCols); if (colStats != null) { - sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, statisticsObjs); - needUpdateAggrStat = true; + sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(), + statisticsObjs, tblAfter.getParameters()); } for (String column : deletedCols) { sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column); - needUpdateAggrStat = true; } } @@ -309,10 +309,8 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve sharedCache.alterPartitionInCache(catalogName, dbName, tableName, alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter()); //TODO : Use the stat object stored in the alter table message to update the stats in cache. - if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(), - catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()) != null) { - CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, catalogName, dbName, tableName); - } + updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(), + catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()); break; case MessageBuilder.DROP_PARTITION_EVENT: DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message); @@ -329,7 +327,7 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message); sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter()); //TODO : Use the stat object stored in the alter table message to update the stats in cache. - updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(), + updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(), catalogName, dbName, tableName); break; case MessageBuilder.DROP_TABLE_EVENT: @@ -371,8 +369,8 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve break; case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT: UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message); - updateTableColumnsStatsInternal(rawStore.getConf(), msg.getColumnStatistics(), msg.getParameters(), - msg.getValidWriteIds(), msg.getWriteId()); + sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(), + msg.getColumnStatistics().getStatsObj(), msg.getParameters()); break; case MessageBuilder.DELETE_TBL_COL_STAT_EVENT: DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message); @@ -380,7 +378,8 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve break; case MessageBuilder.UPDATE_PART_COL_STAT_EVENT: UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message); - sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals(), + sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(), + msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(), msgPartUpdate.getColumnStatistics().getStatsObj()); break; case MessageBuilder.DELETE_PART_COL_STAT_EVENT: @@ -909,7 +908,7 @@ private static void updateTableAggregatePartitionColStats(RawStore rawStore, Str sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); + aggrStatsAllButDefaultPartition, null); } } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, @@ -948,7 +947,12 @@ public boolean commitTransaction() { // the event related to the current transactions are updated in the cache and thus we can support strong // consistency in case there is only one metastore. if (canUseEvents) { - triggerUpdateUsingEvent(rawStore); + try { + triggerUpdateUsingEvent(rawStore); + } catch (Exception e) { + //TODO : Not sure how to handle it as the commit is already done in the object store. + LOG.error("Failed to update cache", e); + } } return true; } @@ -2000,8 +2004,7 @@ private String getPartNameMatcher(Table table, List partSpecs) throws Me Map params, long statsWriteId, String validWriteIds) throws MetaException { if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a txn table. if (areTxnStatsSupported && ((validWriteIds == null) - || ObjectStore.isCurrentStatsValidForTheQuery( - conf, params, statsWriteId, validWriteIds, false))) { + || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) { // Valid stats are supported for txn tables, and either no verification was requested by the // caller, or the verification has succeeded. return params; @@ -2014,14 +2017,14 @@ private String getPartNameMatcher(Table table, List partSpecs) throws Me // Note: ideally this should be above both CachedStore and ObjectStore. - private ColumnStatistics adjustColStatForGet(Map tableParams, - Map params, ColumnStatistics colStat, long statsWriteId, - String validWriteIds) throws MetaException { + public static ColumnStatistics adjustColStatForGet(Map tableParams, + ColumnStatistics colStat, long statsWriteId, + String validWriteIds, boolean areTxnStatsSupported) throws MetaException { colStat.setIsStatsCompliant(true); if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a txn table. if (areTxnStatsSupported && ((validWriteIds == null) || ObjectStore.isCurrentStatsValidForTheQuery( - conf, params, statsWriteId, validWriteIds, false))) { + tableParams, statsWriteId, validWriteIds, false))) { // Valid stats are supported for txn tables, and either no verification was requested by the // caller, or the verification has succeeded. return colStat; @@ -2058,7 +2061,7 @@ private static void updateTableColumnsStatsInternal(Configuration conf, ColumnSt if (errorMsg != null) { throw new MetaException(errorMsg); } - if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, table.getWriteId(), + if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(), validWriteIds, true)) { // Make sure we set the flag to invalid regardless of the current value. StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); @@ -2111,11 +2114,14 @@ public ColumnStatistics getTableColumnStatistics( return rawStore.getTableColumnStatistics( catName, dbName, tblName, colNames, validWriteIds); } - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); - List colStatObjs = - sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames); - return adjustColStatForGet(table.getParameters(), table.getParameters(), - new ColumnStatistics(csd, colStatObjs), table.getWriteId(), validWriteIds); + ColumnStatistics columnStatistics = + sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames, validWriteIds, areTxnStatsSupported); + if (columnStatistics == null) { + LOG.info("Stat of Table {}.{} for column {} is not present in cache." + + "Getting from raw store", dbName, tblName, colNames); + return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds); + } + return columnStatistics; } @Override @@ -2170,10 +2176,17 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String String catName, String dbName, String tblName, List partNames, List colNames, String writeIdList) throws MetaException, NoSuchObjectException { - // TODO: why have updatePartitionColumnStatistics cache if this is a bypass? - // Note: when implemented, this needs to call adjustColStatForGet, like other get methods. - return rawStore.getPartitionColumnStatistics( - catName, dbName, tblName, partNames, colNames, writeIdList); + + // If writeIdList is not null, that means stats are requested within a txn context. So set stats compliant to false, + // if areTxnStatsSupported is false or the write id which has updated the stats in not compatible with writeIdList. + // This is done within table lock as the number of partitions may be more than one and we need a consistent view + // for all the partitions. + List columnStatistics = sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName, + partNames, colNames, writeIdList, areTxnStatsSupported); + if (columnStatistics == null) { + return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, writeIdList); + } + return columnStatistics; } @Override @@ -2212,8 +2225,8 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam tblName = StringUtils.normalizeIdentifier(tblName); // TODO: we currently cannot do transactional checks for stats here // (incl. due to lack of sync w.r.t. the below rawStore call). - //TODO : need to calculate aggregate locally in cached store - if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || canUseEvents) { + // In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache. + if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) { return rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } @@ -2225,45 +2238,68 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam } List allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); + StatsType type = StatsType.PARTIAL; if (partNames.size() == allPartNames.size()) { colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALL); if (colStats != null) { return new AggrStats(colStats, partNames.size()); } + type = StatsType.ALL; } else if (partNames.size() == (allPartNames.size() - 1)) { String defaultPartitionName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); if (!partNames.contains(defaultPartitionName)) { - colStats = - sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT); + colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT); if (colStats != null) { return new AggrStats(colStats, partNames.size()); } + type = StatsType.ALLBUTDEFAULT; } } + LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", tblName, partNames, colNames); - MergedColumnStatsForPartitions mergedColStats = - mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache); + MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(catName, dbName, tblName, + partNames, colNames, sharedCache, type, writeIdList); + if (mergedColStats == null) { + LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." + + partNames + " for columns " + colNames + " is not present in cache. Getting it from raw store"); + return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList); + } return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound()); } private MergedColumnStatsForPartitions mergeColStatsForPartitions( String catName, String dbName, String tblName, List partNames, List colNames, - SharedCache sharedCache) throws MetaException { + SharedCache sharedCache, StatsType type, String writeIdList) throws MetaException { final boolean useDensityFunctionForNDVEstimation = MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION); final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER); Map> colStatsMap = new HashMap<>(); - boolean areAllPartsFound = true; - long partsFound = 0; + long partsFound = partNames.size(); + Map, Long> partNameToWriteId = writeIdList != null ? new HashMap<>() : null; for (String colName : colNames) { long partsFoundForColumn = 0; ColumnStatsAggregator colStatsAggregator = null; List colStatsWithPartInfoList = new ArrayList<>(); for (String partName : partNames) { - ColumnStatisticsObj colStatsForPart = - sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partNameToVals(partName), colName); - if (colStatsForPart != null) { + List partValue = partNameToVals(partName); + // There are three possible result from getPartitionColStatsFromCache. + // 1. The partition has valid stats and thus colStatsWriteId returned is valid non-null value + // 2. Partition stat is missing from cache and thus colStatsWriteId returned is non-null but colstat + // info in it is null. In this case we just ignore the partition from aggregate calculation to keep + // the behavior same as object store. + // 3. Partition is missing or its stat is updated by live(not yet committed) or aborted txn. In this case, + // colStatsWriteId is null. Thus null is returned to keep the behavior same as object store. + SharedCache.ColumStatsWithWriteId colStatsWriteId = sharedCache.getPartitionColStatsFromCache(catName, dbName, + tblName, partValue, colName, writeIdList); + if (colStatsWriteId == null) { + return null; + } + if (colStatsWriteId.getColumnStatisticsObj() != null) { + ColumnStatisticsObj colStatsForPart = colStatsWriteId.getColumnStatisticsObj(); + if (partNameToWriteId != null) { + partNameToWriteId.put(partValue, colStatsWriteId.getWriteId()); + } ColStatsObjWithSourceInfo colStatsWithPartInfo = new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName); colStatsWithPartInfoList.add(colStatsWithPartInfo); @@ -2282,7 +2318,9 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions( if (colStatsWithPartInfoList.size() > 0) { colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList); } - if (partsFoundForColumn == partNames.size()) { + // set partsFound to the min(partsFoundForColumn) for all columns. partsFound is the number of partitions, for + // which stats for all columns are present in the cache. + if (partsFoundForColumn < partsFound) { partsFound = partsFoundForColumn; } if (colStatsMap.size() < 1) { @@ -2293,8 +2331,23 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions( } // Note that enableBitVector does not apply here because ColumnStatisticsObj // itself will tell whether bitvector is null or not and aggr logic can automatically apply. - return new MergedColumnStatsForPartitions(MetaStoreServerUtils.aggrPartitionStats(colStatsMap, - partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound); + List colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap, + partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner); + + if (canUseEvents) { + if (type == StatsType.ALL) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound), + null, partNameToWriteId); + } else if (type == StatsType.ALLBUTDEFAULT) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), null, + new AggrStats(colAggrStats, partsFound), partNameToWriteId); + } + } + return new MergedColumnStatsForPartitions(colAggrStats, partsFound); } class MergedColumnStatsForPartitions { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index ce9e383f70..1c230227eb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -33,13 +33,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.TreeMap; -import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator; @@ -56,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; public class SharedCache { @@ -85,7 +92,7 @@ private static HashMap, ObjectEstimator> sizeEstimators = null; enum StatsType { - ALL(0), ALLBUTDEFAULT(1); + ALL(0), ALLBUTDEFAULT(1), PARTIAL(2); private final int position; @@ -249,6 +256,7 @@ public Partition getPartition(List partVals, SharedCache sharedCache) { tableLock.readLock().lock(); PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals)); if (wrapper == null) { + LOG.debug("Partition: " + partVals + " is not present in the cache."); return null; } part = CacheUtils.assemble(wrapper, sharedCache); @@ -342,6 +350,26 @@ public void alterPartition(List partVals, Partition newPart, SharedCache } } + public void alterPartitionAndStats(List partVals, SharedCache sharedCache, long writeId, + Map parameters, List colStatsObjs) { + try { + tableLock.writeLock().lock(); + PartitionWrapper partitionWrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals)); + if (partitionWrapper == null) { + LOG.info("Partition " + partVals + " is missing from cache. Cannot update the partition stats in cache."); + return; + } + Partition newPart = partitionWrapper.getPartition(); + newPart.setParameters(parameters); + newPart.setWriteId(writeId); + removePartition(partVals, sharedCache); + cachePartition(newPart, sharedCache); + updatePartitionColStats(partVals, colStatsObjs); + } finally { + tableLock.writeLock().unlock(); + } + } + public void alterPartitions(List> partValsList, List newParts, SharedCache sharedCache) { try { @@ -445,7 +473,9 @@ public void refreshTableColStats(List colStatsForTable) { } } - public List getCachedTableColStats(List colNames) { + public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List colNames, + String validWriteIds, boolean areTxnStatsSupported) + throws MetaException { List colStatObjs = new ArrayList(); try { tableLock.readLock().lock(); @@ -455,10 +485,11 @@ public void refreshTableColStats(List colStatsForTable) { colStatObjs.add(colStatObj); } } + return CachedStore.adjustColStatForGet(getTable().getParameters(), new ColumnStatistics(csd, colStatObjs), + getTable().getWriteId(), validWriteIds, areTxnStatsSupported); } finally { tableLock.readLock().unlock(); } - return colStatObjs; } public void removeTableColStats(String colName) { @@ -485,16 +516,88 @@ public void removeAllTableColStats() { } } - public ColumnStatisticsObj getPartitionColStats(List partVal, String colName) { + public ColumStatsWithWriteId getPartitionColStats(List partVal, String colName, String writeIdList) { try { tableLock.readLock().lock(); - return partitionColStatsCache - .get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName)); + ColumnStatisticsObj statisticsObj = + partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName)); + if (statisticsObj == null || writeIdList == null) { + return new ColumStatsWithWriteId(-1, statisticsObj); + } + PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal)); + if (wrapper == null) { + LOG.info("Partition: " + partVal + " is not present in the cache. Cannot update stats in cache."); + return null; + } + long writeId = wrapper.getPartition().getWriteId(); + ValidWriteIdList list4TheQuery = new ValidReaderWriteIdList(writeIdList); + // Just check if the write ID is valid. If it's valid (i.e. we are allowed to see it), + // that means it cannot possibly be a concurrent write. If it's not valid (we are not + // allowed to see it), that means it's either concurrent or aborted, same thing for us. + if (!list4TheQuery.isWriteIdValid(writeId)) { + LOG.debug("Write id list " + writeIdList + " is not compatible with write id " + writeId); + return null; + } + return new ColumStatsWithWriteId(writeId, statisticsObj); } finally { tableLock.readLock().unlock(); } } + public List getPartColStatsList(List partNames, List colNames, + String writeIdList, boolean txnStatSupported) throws MetaException { + List colStatObjs = new ArrayList<>(); + try { + tableLock.readLock().lock(); + Table tbl = getTable(); + for (String partName : partNames) { + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false, + tbl.getDbName(), tbl.getTableName()); + csd.setCatName(tbl.getCatName()); + csd.setPartName(partName); + csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is not being used by anybody now. + List statObject = new ArrayList<>(); + List partVal = Warehouse.getPartValuesFromPartName(partName); + for (String colName : colNames) { + ColumnStatisticsObj statisticsObj = + partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName)); + if (statisticsObj != null) { + statObject.add(statisticsObj); + } else { + LOG.info("Stats not available in cachedStore for col " + colName + " in partition " + partVal); + return null; + } + } + ColumnStatistics columnStatistics = new ColumnStatistics(csd, statObject); + if (writeIdList != null && TxnUtils.isTransactionalTable(getParameters())) { + columnStatistics.setIsStatsCompliant(true); + if (!txnStatSupported) { + columnStatistics.setIsStatsCompliant(false); + } else { + PartitionWrapper wrapper = + partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal)); + if (wrapper == null) { + columnStatistics.setIsStatsCompliant(false); + } else { + Partition partition = wrapper.getPartition(); + if (!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(), + partition.getWriteId(), writeIdList, false)) { + LOG.debug("The current cached store transactional partition column statistics for {}.{}.{} " + + "(write ID {}) are not valid for current query ({})", tbl.getDbName(), + tbl.getTableName(), partName, partition.getWriteId(), writeIdList); + columnStatistics.setIsStatsCompliant(false); + } + } + } + } + colStatObjs.add(columnStatistics); + } + } finally { + tableLock.readLock().unlock(); + } + return colStatObjs; + } + public boolean updatePartitionColStats(List partVal, List colStatsObjs) { try { @@ -661,11 +764,30 @@ public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions, } public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions, - AggrStats aggrStatsAllButDefaultPartition) { + AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache, Map, Long> partNameToWriteId) { Map> newAggrColStatsCache = new HashMap>(); try { tableLock.writeLock().lock(); + if (partNameToWriteId != null) { + for (Entry, Long> partValuesWriteIdSet : partNameToWriteId.entrySet()) { + List partValues = partValuesWriteIdSet.getKey(); + Partition partition = getPartition(partValues, sharedCache); + if (partition == null) { + LOG.info("Could not refresh the aggregate stat as partition " + partValues + " does not exist"); + return; + } + + // for txn tables, if the write id is modified means the partition is updated post fetching of stats. So + // skip updating the aggregate stats in the cache. + long writeId = partition.getWriteId(); + if (writeId != partValuesWriteIdSet.getValue()) { + LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id " + + partValuesWriteIdSet.getValue() + " instead of " + writeId); + return; + } + } + } if (aggrStatsAllPartitions != null) { for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) { if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) { @@ -794,6 +916,23 @@ public int getRefCount() { } } + public static class ColumStatsWithWriteId { + private long writeId; + private ColumnStatisticsObj columnStatisticsObj; + public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj columnStatisticsObj) { + this.writeId = writeId; + this.columnStatisticsObj = columnStatisticsObj; + } + + public long getWriteId() { + return writeId; + } + + public ColumnStatisticsObj getColumnStatisticsObj() { + return columnStatisticsObj; + } + } + public void populateCatalogsInCache(Collection catalogs) { for (Catalog cat : catalogs) { Catalog catCopy = cat.deepCopy(); @@ -1205,6 +1344,30 @@ public void alterTableInCache(String catName, String dbName, String tblName, Tab } } + public void alterTableAndStatsInCache(String catName, String dbName, String tblName, long writeId, + List colStatsObjs, Map newParams) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = + tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper == null) { + LOG.info("Table " + tblName + " is missing from cache. Cannot update table stats in cache"); + return; + } + Table newTable = tblWrapper.getTable(); + newTable.setWriteId(writeId); + newTable.setParameters(newParams); + //tblWrapper.updateTableObj(newTable, this); + String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName()); + String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); + tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper); + tblWrapper.updateTableColStats(colStatsObjs); + isTableCacheDirty.set(true); + } finally { + cacheLock.writeLock().unlock(); + } + } + public List listCachedTables(String catName, String dbName) { List
tables = new ArrayList<>(); try { @@ -1299,19 +1462,20 @@ public void refreshTablesInCache(String catName, String dbName, List
tabl } } - public List getTableColStatsFromCache(String catName, String dbName, - String tblName, List colNames) { - List colStatObjs = new ArrayList<>(); + public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, + String tblName, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { try { cacheLock.readLock().lock(); TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - colStatObjs = tblWrapper.getCachedTableColStats(colNames); + if (tblWrapper == null) { + LOG.info("Table " + tblName + " is missing from cache."); + return null; } + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); + return tblWrapper.getCachedTableColStats(csd, colNames, validWriteIds, areTxnStatsSupported); } finally { cacheLock.readLock().unlock(); } - return colStatObjs; } public void removeTableColStatsFromCache(String catName, String dbName, String tblName, @@ -1321,6 +1485,8 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removeTableColStats(colName); + } else { + LOG.info("Table " + tblName + " is missing from cache."); } } finally { cacheLock.readLock().unlock(); @@ -1333,6 +1499,8 @@ public void removeAllTableColStatsFromCache(String catName, String dbName, Strin TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removeAllTableColStats(); + } else { + LOG.info("Table " + tblName + " is missing from cache."); } } finally { cacheLock.readLock().unlock(); @@ -1347,6 +1515,8 @@ public void updateTableColStatsInCache(String catName, String dbName, String tab tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { tblWrapper.updateTableColStats(colStatsForTable); + } else { + LOG.info("Table " + tableName + " is missing from cache."); } } finally { cacheLock.readLock().unlock(); @@ -1361,6 +1531,8 @@ public void refreshTableColStatsInCache(String catName, String dbName, String ta tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { tblWrapper.refreshTableColStats(colStatsForTable); + } else { + LOG.info("Table " + tableName + " is missing from cache."); } } finally { cacheLock.readLock().unlock(); @@ -1513,6 +1685,20 @@ public void alterPartitionInCache(String catName, String dbName, String tblName, } } + public void alterPartitionAndStatsInCache(String catName, String dbName, String tblName, long writeId, + List partVals, Map parameters, + List colStatsObjs) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, colStatsObjs); + } + } finally { + cacheLock.readLock().unlock(); + } + } + public void alterPartitionsInCache(String catName, String dbName, String tblName, List> partValsList, List newParts) { try { @@ -1578,14 +1764,14 @@ public void updatePartitionColStatsInCache(String catName, String dbName, String } } - public ColumnStatisticsObj getPartitionColStatsFromCache(String catName, String dbName, - String tblName, List partVal, String colName) { - ColumnStatisticsObj colStatObj = null; + public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName, + String tblName, List partVal, String colName, String writeIdList) { + ColumStatsWithWriteId colStatObj = null; try { cacheLock.readLock().lock(); TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { - colStatObj = tblWrapper.getPartitionColStats(partVal, colName); + colStatObj = tblWrapper.getPartitionColStats(partVal, colName, writeIdList); } } finally { cacheLock.readLock().unlock(); @@ -1593,6 +1779,24 @@ public ColumnStatisticsObj getPartitionColStatsFromCache(String catName, String return colStatObj; } + public List getPartitionColStatsListFromCache(String catName, String dbName, String tblName, + List partNames, List colNames, + String writeIdList, boolean txnStatSupported) { + List colStatObjs = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper != null) { + colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, writeIdList, txnStatSupported); + } + } catch (MetaException e) { + LOG.warn("Failed to get partition column statistics"); + } finally { + cacheLock.readLock().unlock(); + } + return colStatObjs; + } + public void refreshPartitionColStatsInCache(String catName, String dbName, String tblName, List partitionColStats) { try { @@ -1635,13 +1839,14 @@ public void addAggregateStatsToCache(String catName, String dbName, String tblNa } public void refreshAggregateStatsInCache(String catName, String dbName, String tblName, - AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition, + Map, Long> partNameToWriteId) { try { cacheLock.readLock().lock(); TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); + aggrStatsAllButDefaultPartition, this, partNameToWriteId); } } finally { cacheLock.readLock().unlock(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java index 094f799460..ba61a08173 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java @@ -35,7 +35,6 @@ @InterfaceStability.Stable public class UpdatePartitionColumnStatEvent extends ListenerEvent { private ColumnStatistics partColStats; - private String validWriteIds; private long writeId; private Map parameters; private List partVals; @@ -45,16 +44,14 @@ * @param statsObj Columns statistics Info. * @param partVals partition names * @param parameters table parameters to be updated after stats are updated. - * @param validWriteIds valid write id list for the query. + * @param tableObj table object * @param writeId writeId for the query. * @param handler handler that is firing the event */ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List partVals, Map parameters, - Table tableObj, String validWriteIds, long writeId, - IHMSHandler handler) { + Table tableObj, long writeId, IHMSHandler handler) { super(true, handler); this.partColStats = statsObj; - this.validWriteIds = validWriteIds; this.writeId = writeId; this.parameters = parameters; this.partVals = partVals; @@ -71,7 +68,6 @@ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List pa super(true, handler); this.partColStats = statsObj; this.partVals = partVals; - this.validWriteIds = null; this.writeId = 0; this.parameters = null; this.tableObj = tableObj; @@ -81,10 +77,6 @@ public ColumnStatistics getPartColStats() { return partColStats; } - public String getValidWriteIds() { - return validWriteIds; - } - public long getWriteId() { return writeId; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java index 3f988bbdc2..71300abf4e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java @@ -35,24 +35,22 @@ @InterfaceStability.Stable public class UpdateTableColumnStatEvent extends ListenerEvent { private ColumnStatistics colStats; - private String validWriteIds; private long writeId; private Map parameters; private Table tableObj; /** * @param colStats Columns statistics Info. + * @param tableObj table object * @param parameters table parameters to be updated after stats are updated. - * @param validWriteIds valid write id list for the query. - * @param colStats writeId for the query. + * @param writeId writeId for the query. * @param handler handler that is firing the event */ public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj, - Map parameters, String validWriteIds, + Map parameters, long writeId, IHMSHandler handler) { super(true, handler); this.colStats = colStats; - this.validWriteIds = validWriteIds; this.writeId = writeId; this.parameters = parameters; this.tableObj = tableObj; @@ -65,7 +63,6 @@ public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj, public UpdateTableColumnStatEvent(ColumnStatistics colStats, IHMSHandler handler) { super(true, handler); this.colStats = colStats; - this.validWriteIds = null; this.writeId = 0; this.parameters = null; this.tableObj = null; @@ -75,10 +72,6 @@ public ColumnStatistics getColStats() { return colStats; } - public String getValidWriteIds() { - return validWriteIds; - } - public long getWriteId() { return writeId; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java index 10c6b44bb7..15c4769e33 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -289,9 +289,9 @@ public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, public JSONUpdateTableColumnStatMessage buildUpdateTableColumnStatMessage(ColumnStatistics colStats, Table tableObj, Map parameters, - String validWriteIds, long writeId) { + long writeId) { return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), - colStats, tableObj, parameters, validWriteIds, writeId); + colStats, tableObj, parameters, writeId); } public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String dbName, String colName) { @@ -300,9 +300,9 @@ public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String public JSONUpdatePartitionColumnStatMessage buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, String validWriteIds, long writeId) { + Table tableObj, long writeId) { return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), colStats, partVals, - parameters, tableObj, validWriteIds, writeId); + parameters, tableObj, writeId); } public JSONDeletePartitionColumnStatMessage buildDeletePartitionColumnStatMessage(String dbName, String colName, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java index 7eb6c078c0..e92a0dc9a3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java @@ -34,8 +34,6 @@ protected UpdatePartitionColumnStatMessage() { public abstract ColumnStatistics getColumnStatistics(); - public abstract String getValidWriteIds(); - public abstract Long getWriteId(); public abstract Map getParameters(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java index 7919b0e638..e3f049c48c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java @@ -33,8 +33,6 @@ protected UpdateTableColumnStatMessage() { public abstract ColumnStatistics getColumnStatistics(); - public abstract String getValidWriteIds(); - public abstract Long getWriteId(); public abstract Map getParameters(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java index 1b35df5f6c..fd7fe00419 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java @@ -38,7 +38,7 @@ private Long writeId, timestamp; @JsonProperty - private String validWriteIds, server, servicePrincipal, database; + private String server, servicePrincipal, database; @JsonProperty private String colStatsJson; @@ -61,12 +61,11 @@ public JSONUpdatePartitionColumnStatMessage() { public JSONUpdatePartitionColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, String validWriteIds, long writeId) { + Table tableObj, long writeId) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; this.writeId = writeId; - this.validWriteIds = validWriteIds; this.database = colStats.getStatsDesc().getDbName(); this.partVals = partVals; try { @@ -107,11 +106,6 @@ public ColumnStatistics getColumnStatistics() { } } - @Override - public String getValidWriteIds() { - return validWriteIds; - } - @Override public Long getWriteId() { return writeId; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java index c932b7cf61..275d204957 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java @@ -36,7 +36,7 @@ private Long writeId, timestamp; @JsonProperty - private String validWriteIds, server, servicePrincipal, database; + private String server, servicePrincipal, database; @JsonProperty private String colStatsJson; @@ -55,12 +55,11 @@ public JSONUpdateTableColumnStatMessage() { public JSONUpdateTableColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, Table tableObj, Map parameters, - String validWriteIds, long writeId) { + long writeId) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; this.writeId = writeId; - this.validWriteIds = validWriteIds; this.database = colStats.getStatsDesc().getDbName(); try { this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats); @@ -105,11 +104,6 @@ public Table getTableObject() throws Exception { return (Table) MessageBuilder.getTObj(tableObjJson, Table.class); } - @Override - public String getValidWriteIds() { - return validWriteIds; - } - @Override public Long getWriteId() { return writeId; diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 2f102a253e..02ff4aed8f 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -177,6 +177,8 @@ CREATE TABLE "APP"."NOTIFICATION_LOG" ( "MESSAGE_FORMAT" VARCHAR(16) ); +CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON "APP"."NOTIFICATION_LOG" ("EVENT_ID"); + CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL); CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, "CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT , "PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL, "POSITION" BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE" SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT, "ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400)); diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 8ad56fc033..1a1e34a5c8 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -9,6 +9,9 @@ UPDATE "APP"."WM_RESOURCEPLAN" SET NS = 'default' WHERE NS IS NULL; DROP INDEX "APP"."UNIQUE_WM_RESOURCEPLAN"; CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NS", "NAME"); +-- HIVE-21063 +CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON "APP"."NOTIFICATION_LOG" ("EVENT_ID"); + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 383d3bc8c3..4f58343b93 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -629,6 +629,8 @@ CREATE TABLE NOTIFICATION_LOG ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID); +CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID); + CREATE TABLE NOTIFICATION_SEQUENCE ( NNI_ID bigint NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index edde08db9e..e0d143afce 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -10,6 +10,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL; DROP INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN; CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN ("NS", "NAME"); +-- HIVE-21063 +CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 5466537c45..8db11d3f81 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -871,6 +871,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG` PRIMARY KEY (`NL_ID`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG (`EVENT_ID`) USING BTREE; + CREATE TABLE IF NOT EXISTS `NOTIFICATION_SEQUENCE` ( `NNI_ID` BIGINT(20) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 701acb0098..47c38316a0 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -11,6 +11,9 @@ UPDATE `WM_RESOURCEPLAN` SET `NS` = 'default' WHERE `NS` IS NULL; ALTER TABLE `WM_RESOURCEPLAN` DROP KEY `UNIQUE_WM_RESOURCEPLAN`; ALTER TABLE `WM_RESOURCEPLAN` ADD UNIQUE KEY `UNIQUE_WM_RESOURCEPLAN` (`NAME`, `NS`); +-- HIVE-21063 +CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG (`EVENT_ID`) USING BTREE; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS ' '; diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index 2a9c38f186..8af9a76f64 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -624,6 +624,8 @@ CREATE TABLE NOTIFICATION_LOG ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID); +CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID); + CREATE TABLE NOTIFICATION_SEQUENCE ( NNI_ID NUMBER NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index b9f6331325..231376be31 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -9,6 +9,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL; DROP INDEX UNIQUE_WM_RESOURCEPLAN; CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN (NS, "NAME"); +-- HIVE-21063 +CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 0a359d9e2a..2aff1e44bc 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -631,6 +631,8 @@ CREATE TABLE "NOTIFICATION_LOG" PRIMARY KEY ("NL_ID") ); +CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING btree ("EVENT_ID"); + CREATE TABLE "NOTIFICATION_SEQUENCE" ( "NNI_ID" BIGINT NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index 0c36069d07..2d4363b3d6 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -11,6 +11,9 @@ UPDATE "WM_RESOURCEPLAN" SET "NS" = 'default' WHERE "NS" IS NULL; ALTER TABLE "WM_RESOURCEPLAN" DROP CONSTRAINT "UNIQUE_WM_RESOURCEPLAN"; ALTER TABLE ONLY "WM_RESOURCEPLAN" ADD CONSTRAINT "UNIQUE_WM_RESOURCEPLAN" UNIQUE ("NS", "NAME"); +-- HIVE-21063 +CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING btree ("EVENT_ID"); + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 7429d18226..77e0c98265 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import org.junit.Test; @@ -3325,22 +3326,23 @@ public void testUpdatePartitionStat_doesNotUpdateStats() throws Exception { Warehouse wh = mock(Warehouse.class); //Execute initializeAddedPartition() and it should not trigger updatePartitionStatsFast() as DO_NOT_UPDATE_STATS is true HiveMetaStore.HMSHandler hms = new HiveMetaStore.HMSHandler("", conf, false); - Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", Table.class, Partition.class, boolean.class); + Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", Table.class, Partition.class, + boolean.class, EnvironmentContext.class); m.setAccessible(true); //Invoke initializeAddedPartition(); - m.invoke(hms, tbl, part, false); + m.invoke(hms, tbl, part, false, null); verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation()); //Remove tbl's DO_NOT_UPDATE_STATS & set STATS_AUTO_GATHER = false tbl.unsetParameters(); MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, false); - m.invoke(hms, tbl, part, false); + m.invoke(hms, tbl, part, false, null); verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation()); //Set STATS_AUTO_GATHER = true and set tbl as a VIRTUAL_VIEW MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, true); tbl.setTableType("VIRTUAL_VIEW"); - m.invoke(hms, tbl, part, false); + m.invoke(hms, tbl, part, false, null); verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation()); } }