diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 1c105d152a..be40395cc3 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -321,10 +321,10 @@ public void updateCreationMetadata(String catName, String dbname, String tablena } @Override - public void alterTable(String catName, String dbName, String name, Table newTable, String queryValidWriteIds) + public Table alterTable(String catName, String dbName, String name, Table newTable, String queryValidWriteIds) throws InvalidObjectException, MetaException { if (shouldEventSucceed) { - objectStore.alterTable(catName, dbName, name, newTable, queryValidWriteIds); + return objectStore.alterTable(catName, dbName, name, newTable, queryValidWriteIds); } else { throw new RuntimeException("Event failed."); } @@ -385,22 +385,22 @@ public PartitionValuesResponse listPartitionValues(String catName, String db_nam } @Override - public void alterPartition(String catName, String dbName, String tblName, List partVals, + public Partition alterPartition(String catName, String dbName, String tblName, List partVals, Partition newPart, String queryValidWriteIds) throws InvalidObjectException, MetaException { if (shouldEventSucceed) { - objectStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryValidWriteIds); + return objectStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryValidWriteIds); } else { throw new RuntimeException("Event failed."); } } @Override - public void alterPartitions(String catName, String dbName, String tblName, + public List alterPartitions(String catName, String dbName, String tblName, List> partValsList, List newParts, long writeId, String queryValidWriteIds) throws InvalidObjectException, MetaException { if (shouldEventSucceed) { - objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, queryValidWriteIds); + return objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, queryValidWriteIds); } else { throw new RuntimeException("Event failed."); } @@ -736,13 +736,13 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St } @Override - public boolean updateTableColumnStatistics(ColumnStatistics statsObj, String validWriteIds, long writeId) + public Map updateTableColumnStatistics(ColumnStatistics statsObj, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { return objectStore.updateTableColumnStatistics(statsObj, validWriteIds, writeId); } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, + public Map updatePartitionColumnStatistics(ColumnStatistics statsObj, List partVals, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { return objectStore.updatePartitionColumnStatistics(statsObj, partVals, validWriteIds, writeId); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index f34cb61696..a50ec18b8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -447,7 +447,7 @@ private String buildPartColStr(Table table) { } // TODO: we should probably skip updating if writeId is from an active txn boolean isTxnValid = (writeIdString == null) || ObjectStore.isCurrentStatsValidForTheQuery( - conf, db, tbl, params, statsWriteId , writeIdString, false); + conf, params, statsWriteId , writeIdString, false); return getExistingStatsToUpdate(existingStats, params, isTxnValid); } @@ -472,7 +472,7 @@ private String buildPartColStr(Table table) { } // TODO: we should probably skip updating if writeId is from an active txn if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery( - conf, db, tbl, params, statsWriteId, writeIdString, false)) { + conf, params, statsWriteId, writeIdString, false)) { return allCols; } List colsToUpdate = new ArrayList<>(); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index eb4eb1b5a6..fb6029f82e 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -5831,7 +5831,7 @@ private boolean updateTableColumnStatsInternal(ColumnStatistics colStats, boolean ret = false; try { - ret = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId); + ret = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId) != null; } finally { endFunction("write_column_statistics", ret != false, null, colStats.getStatsDesc().getTableName()); @@ -5875,7 +5875,7 @@ private boolean updatePartitonColStatsInternal(Table tbl, ColumnStatistics colSt } List partVals = getPartValsFromName(tbl, csd.getPartName()); return getMS().updatePartitionColumnStatistics( - colStats, partVals, validWriteIds, writeId); + colStats, partVals, validWriteIds, writeId) != null; } finally { endFunction("write_partition_column_statistics", ret != false, null, tableName); } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b319e6876f..f42e63c1cd 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2433,7 +2433,7 @@ public Partition getPartition(String catName, String dbName, String tableName, @Override public Partition getPartition(String catName, String dbName, String tableName, List part_vals, - String writeIdList) + String validWriteIds) throws NoSuchObjectException, MetaException { openTransaction(); MTable table = this.getMTable(catName, dbName, tableName); @@ -2447,13 +2447,14 @@ public Partition getPartition(String catName, String dbName, String tableName, part.setValues(part_vals); // If transactional table partition, check whether the current version partition // statistics in the metastore comply with the client query's snapshot isolation. + long statsWriteId = mpart.getWriteId(); if (TxnUtils.isTransactionalTable(table.getParameters())) { if (!areTxnStatsSupported) { // Do not make persistent the following state since it is query specific (not global). StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); - } else if (writeIdList != null) { - if (isCurrentStatsValidForTheQuery(part, mpart.getWriteId(), writeIdList, false)) { + } else if (validWriteIds != null) { + if (isCurrentStatsValidForTheQuery(part, statsWriteId, validWriteIds, false)) { part.setIsStatsCompliant(true); } else { part.setIsStatsCompliant(false); @@ -4091,7 +4092,7 @@ private String makeParameterDeclarationStringObj(Map params) { } @Override - public void alterTable(String catName, String dbname, String name, Table newTable, + public Table alterTable(String catName, String dbname, String name, Table newTable, String queryValidWriteIds) throws InvalidObjectException, MetaException { boolean success = false; boolean registerCreationSignature = false; @@ -4160,6 +4161,7 @@ public void alterTable(String catName, String dbname, String name, Table newTabl oldt.setWriteId(newTable.getWriteId()); } } + newTable = convertToTable(oldt); // commit the changes success = commitTransaction(); @@ -4168,6 +4170,7 @@ public void alterTable(String catName, String dbname, String name, Table newTabl rollbackTransaction(); } } + return newTable; } /** @@ -4220,18 +4223,21 @@ public void updateCreationMetadata(String catName, String dbname, String tablena } } + private static final class Ref { + public T t; + } + /** * Alters an existing partition. Initiates copy of SD. Returns the old CD. * @param dbname * @param name * @param part_vals Partition values (of the original partition instance) * @param newPart Partition object containing new information - * @return The column descriptor of the old partition instance (null if table is a view) * @throws InvalidObjectException * @throws MetaException */ - private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, String name, - List part_vals, Partition newPart, String queryValidWriteIds) + private Partition alterPartitionNoTxn(String catName, String dbname, String name, + List part_vals, Partition newPart, String validWriteIds, Ref oldCd) throws InvalidObjectException, MetaException { catName = normalizeIdentifier(catName); name = normalizeIdentifier(name); @@ -4253,7 +4259,7 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str if (isTxn && areTxnStatsSupported) { // Transactional table is altered without a txn. Make sure there are no changes to the flag. String errorMsg = verifyStatsChangeCtx(oldp.getParameters(), newPart.getParameters(), - newPart.getWriteId(), queryValidWriteIds, false); + newPart.getWriteId(), validWriteIds, false); if (errorMsg != null) { throw new MetaException(errorMsg); } @@ -4274,9 +4280,9 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str if (isTxn) { if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(oldp.getParameters(), StatsSetupConst.FALSE); - } else if (queryValidWriteIds != null && newPart.getWriteId() > 0) { + } else if (validWriteIds != null && newPart.getWriteId() > 0) { // Check concurrent INSERT case and set false to the flag. - if (!isCurrentStatsValidForTheQuery(oldp, queryValidWriteIds, true)) { + if (!isCurrentStatsValidForTheQuery(oldp, validWriteIds, true)) { StatsSetupConst.setBasicStatsState(oldp.getParameters(), StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + dbname + "." + name + "." + oldp.getPartitionName() + " will be made persistent."); @@ -4285,22 +4291,24 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str } } - return oldCD; + oldCd.t = oldCD; + return convertToPart(oldp); } @Override - public void alterPartition(String catName, String dbname, String name, List part_vals, - Partition newPart, String queryValidWriteIds) throws InvalidObjectException, MetaException { + public Partition alterPartition(String catName, String dbname, String name, List part_vals, + Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException { boolean success = false; Throwable e = null; + Partition result = null; try { openTransaction(); if (newPart.isSetWriteId()) { LOG.warn("Alter partitions with write ID called without transaction information"); } - MColumnDescriptor oldCd = alterPartitionNoTxn( - catName, dbname, name, part_vals, newPart, queryValidWriteIds); - removeUnusedColumnDescriptor(oldCd); + Ref oldCd = new Ref(); + result = alterPartitionNoTxn(catName, dbname, name, part_vals, newPart, validWriteIds, oldCd); + removeUnusedColumnDescriptor(oldCd.t); // commit the changes success = commitTransaction(); } catch (Throwable exception) { @@ -4317,28 +4325,33 @@ public void alterPartition(String catName, String dbname, String name, List alterPartitions(String catName, String dbname, String name, List> part_vals, List newParts, long writeId, String queryWriteIdList) throws InvalidObjectException, MetaException { boolean success = false; Exception e = null; + List results = new ArrayList<>(newParts.size()); try { openTransaction(); Iterator> part_val_itr = part_vals.iterator(); Set oldCds = new HashSet<>(); + Ref oldCdRef = new Ref<>(); for (Partition tmpPart: newParts) { List tmpPartVals = part_val_itr.next(); if (writeId > 0) { tmpPart.setWriteId(writeId); } - MColumnDescriptor oldCd = alterPartitionNoTxn( - catName, dbname, name, tmpPartVals, tmpPart, queryWriteIdList); - if (oldCd != null) { - oldCds.add(oldCd); + oldCdRef.t = null; + Partition result = alterPartitionNoTxn( + catName, dbname, name, tmpPartVals, tmpPart, queryWriteIdList, oldCdRef); + results.add(result); + if (oldCdRef.t != null) { + oldCds.add(oldCdRef.t); } } for (MColumnDescriptor oldCd : oldCds) { @@ -4360,6 +4373,7 @@ public void alterPartitions(String catName, String dbname, String name, throw metaException; } } + return results; } private void copyMSD(MStorageDescriptor newSd, MStorageDescriptor oldSd) { @@ -8408,7 +8422,7 @@ private void writeMPartitionColumnStatistics(Table table, Partition partition, } @Override - public boolean updateTableColumnStatistics(ColumnStatistics colStats, + public Map updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean committed = false; @@ -8467,7 +8481,8 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats, oldt.setParameters(newParams); committed = commitTransaction(); - return committed; + // TODO: similar to update...Part, this used to do "return committed;"; makes little sense. + return committed ? newParams : null; } finally { if (!committed) { rollbackTransaction(); @@ -8502,8 +8517,8 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats, } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partVals, - String validWriteIds, long writeId) + public Map updatePartitionColumnStatistics(ColumnStatistics colStats, + List partVals, String validWriteIds, long writeId) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean committed = false; @@ -8561,7 +8576,8 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsParams, long statsWriteId, - String queryValidWriteIdList, boolean isCompleteStatsWriter) throws MetaException { + public static boolean isCurrentStatsValidForTheQuery(Configuration conf, + Map statsParams, long statsWriteId, String queryValidWriteIdList, + boolean isCompleteStatsWriter) throws MetaException { // Note: can be changed to debug/info to verify the calls. // TODO## change this to debug when merging diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index 46082a58b9..8d647a0f6a 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -358,10 +358,11 @@ boolean dropPartition(String catName, String dbName, String tableName, * @param name name of the table. * @param newTable New table object. Which parts of the table can be altered are * implementation specific. + * @return * @throws InvalidObjectException The new table object is invalid. * @throws MetaException something went wrong, usually in the RDBMS or storage. */ - void alterTable(String catName, String dbname, String name, Table newTable, + Table alterTable(String catName, String dbname, String name, Table newTable, String queryValidWriteIds) throws InvalidObjectException, MetaException; @@ -499,10 +500,11 @@ PartitionValuesResponse listPartitionValues(String catName, String db_name, Stri * @param part_vals partition values that describe the partition. * @param new_part new partition object. This should be a complete copy of the old with * changes values, not just the parts to update. + * @return * @throws InvalidObjectException No such partition. * @throws MetaException error accessing the RDBMS. */ - void alterPartition(String catName, String db_name, String tbl_name, List part_vals, + Partition alterPartition(String catName, String db_name, String tbl_name, List part_vals, Partition new_part, String queryValidWriteIds) throws InvalidObjectException, MetaException; @@ -519,10 +521,11 @@ void alterPartition(String catName, String db_name, String tbl_name, List alterPartitions(String catName, String db_name, String tbl_name, List> part_vals_list, List new_parts, long writeId, String queryValidWriteIds) throws InvalidObjectException, MetaException; @@ -864,7 +867,7 @@ Partition getPartitionWithAuth(String catName, String dbName, String tblName, * @throws InvalidObjectException the stats object is invalid * @throws InvalidInputException unable to record the stats for the table */ - boolean updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) + Map updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException; /** Persists the given column statistics object to the metastore @@ -877,7 +880,7 @@ boolean updateTableColumnStatistics(ColumnStatistics colStats, String validWrite * @throws InvalidInputException unable to record the stats for the table * @throws TException */ - boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, + Map updatePartitionColumnStatistics(ColumnStatistics statsObj, List partVals, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index dd705a5204..f73047f9ff 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -92,6 +93,7 @@ private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); private RawStore rawStore = null; private Configuration conf; + private boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; private static final SharedCache sharedCache = new SharedCache(); @@ -129,6 +131,7 @@ private void setConfInternal(Configuration conf) { rawStore.setConf(conf); Configuration oldConf = this.conf; this.conf = conf; + this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); if (expressionProxy != null && conf != oldConf) { LOG.warn("Unexpected setConf when we were already configured"); } else { @@ -279,7 +282,6 @@ static void prewarm(RawStore rawStore) { rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); Deadline.stopTimer(); } - // TODO## should this take write ID into account? or at least cache write ID to verify? // If the table could not cached due to memory limit, stop prewarm boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); @@ -542,24 +544,33 @@ private void updateTables(RawStore rawStore, String catName, String dbName) { private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) { + boolean committed = false; + rawStore.openTransaction(); try { Table table = rawStore.getTable(catName, dbName, tblName); if (!table.isSetPartitionKeys()) { List colNames = MetaStoreUtils.getColumnNamesForTable(table); Deadline.startTimer("getTableColumnStatistics"); - // TODO## should this take write ID into account? or at least cache write ID to verify? + ColumnStatistics tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); Deadline.stopTimer(); if (tableColStats != null) { - // TODO## should this take write ID into account? or at least cache write ID to verify? sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + // Update the table to get consistent stats state. + sharedCache.alterTableInCache(catName, dbName, tblName, table); } } + committed = rawStore.commitTransaction(); } catch (MetaException | NoSuchObjectException e) { LOG.info("Unable to refresh table column stats for table: " + tblName, e); + } finally { + if (!committed) { + sharedCache.removeAllTableColStatsFromCache(catName, dbName, tblName); + rawStore.rollbackTransaction(); + } } } @@ -577,19 +588,31 @@ private void updateTablePartitions(RawStore rawStore, String catName, String dbN } private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { + boolean committed = false; + rawStore.openTransaction(); try { Table table = rawStore.getTable(catName, dbName, tblName); List colNames = MetaStoreUtils.getColumnNamesForTable(table); List partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); // Get partition column stats for this table Deadline.startTimer("getPartitionColumnStatistics"); - // TODO## should this take write ID into account? or at least cache write ID to verify? List partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); Deadline.stopTimer(); sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats); + List parts = rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); + // Also save partitions for consistency as they have the stats state. + for (Partition part : parts) { + sharedCache.alterPartitionInCache(catName, dbName, tblName, part.getValues(), part); + } + committed = rawStore.commitTransaction(); } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } finally { + if (!committed) { + sharedCache.removeAllPartitionColStatsFromCache(catName, dbName, tblName); + rawStore.rollbackTransaction(); + } } } @@ -828,31 +851,32 @@ public Table getTable(String catName, String dbName, String tblName) throws Meta return getTable(catName, dbName, tblName, null); } - // TODO: if writeIdList is not null, check isolation level compliance for SVS, - // possibly with getTableFromCache() with table snapshot in cache. @Override public Table getTable(String catName, String dbName, String tblName, - String writeIdList) + String validWriteIds) throws MetaException { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { - return rawStore.getTable(catName, dbName, tblName, writeIdList); + return rawStore.getTable(catName, dbName, tblName, validWriteIds); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); - if (tbl == null || writeIdList != null) { + if (tbl == null) { // This table is not yet loaded in cache // If the prewarm thread is working on this table's database, // let's move this table to the top of tblNamesBeingPrewarmed stack, // so that it gets loaded to the cache faster and is available for subsequent requests tblsPendingPrewarm.prioritizeTableForPrewarm(tblName); - return rawStore.getTable(catName, dbName, tblName, writeIdList); + return rawStore.getTable(catName, dbName, tblName, validWriteIds); } - if (tbl != null) { - tbl.unsetPrivileges(); - tbl.setRewriteEnabled(tbl.isRewriteEnabled()); + if (validWriteIds != null) { + tbl.setParameters(adjustStatsParamsForGet(tbl.getParameters(), + tbl.getParameters(), tbl.getWriteId(), validWriteIds)); } + + tbl.unsetPrivileges(); + tbl.setRewriteEnabled(tbl.isRewriteEnabled()); return tbl; } @@ -913,24 +937,34 @@ public Partition getPartition(String catName, String dbName, String tblName, Lis return getPartition(catName, dbName, tblName, part_vals, null); } - // TODO: the same as getTable() @Override public Partition getPartition(String catName, String dbName, String tblName, - List part_vals, String writeIdList) + List part_vals, String validWriteIds) throws MetaException, NoSuchObjectException { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartition( - catName, dbName, tblName, part_vals, writeIdList); + catName, dbName, tblName, part_vals, validWriteIds); } Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, part_vals); - if (part == null || writeIdList != null) { + if (part == null) { // The table containing the partition is not yet loaded in cache return rawStore.getPartition( - catName, dbName, tblName, part_vals, writeIdList); + catName, dbName, tblName, part_vals, validWriteIds); + } + if (validWriteIds != null) { + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table containing the partition is not yet loaded in cache + return rawStore.getPartition( + catName, dbName, tblName, part_vals, validWriteIds); + } + part.setParameters(adjustStatsParamsForGet(table.getParameters(), + part.getParameters(), part.getWriteId(), validWriteIds)); } + return part; } @@ -1010,21 +1044,21 @@ public void dropPartitions(String catName, String dbName, String tblName, List partVals, - Partition newPart, String queryValidWriteIds) - throws InvalidObjectException, MetaException { - rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryValidWriteIds); + public Partition alterPartition(String catName, String dbName, String tblName, + List partVals, Partition newPart, String validWriteIds) + throws InvalidObjectException, MetaException { + newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds); catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { - return; + return newPart; } sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, newPart); + return newPart; } @Override - public void alterPartitions(String catName, String dbName, String tblName, + public List alterPartitions(String catName, String dbName, String tblName, List> partValsList, List newParts, long writeId, String validWriteIds) throws InvalidObjectException, MetaException { - rawStore.alterPartitions( + newParts = rawStore.alterPartitions( catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds); catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { - return; + return newParts; } - // TODO: modify the following method for the case when writeIdList != null. sharedCache.alterPartitionsInCache(catName, dbName, tblName, partValsList, newParts); + return newParts; } private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, @@ -1598,34 +1634,67 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN return partitions; } + // Note: ideally this should be above both CachedStore and ObjectStore. + private Map adjustStatsParamsForGet(Map tableParams, + Map params, long statsWriteId, String validWriteIds) throws MetaException { + if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a txn table. + if (areTxnStatsSupported && ((validWriteIds == null) + || ObjectStore.isCurrentStatsValidForTheQuery( + conf, params, statsWriteId, validWriteIds, false))) { + // Valid stats are supported for txn tables, and either no verification was requested by the + // caller, or the verification has succeeded. + return params; + } + // Clone the map to avoid affecting the cached value. + params = new HashMap<>(params); + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); + return params; + } + + + // Note: ideally this should be above both CachedStore and ObjectStore. + private ColumnStatistics adjustColStatForGet(Map tableParams, + Map params, ColumnStatistics colStat, long statsWriteId, + String validWriteIds) throws MetaException { + colStat.setIsStatsCompliant(true); + if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a txn table. + if (areTxnStatsSupported && ((validWriteIds == null) + || ObjectStore.isCurrentStatsValidForTheQuery( + conf, params, statsWriteId, validWriteIds, false))) { + // Valid stats are supported for txn tables, and either no verification was requested by the + // caller, or the verification has succeeded. + return colStat; + } + // Don't clone; ColStats objects are not cached, only their parts. + colStat.setIsStatsCompliant(false); + return colStat; + } + @Override - public boolean updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) + public Map updateTableColumnStatistics(ColumnStatistics colStats, + String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updateTableColumnStatistics(colStats, validWriteIds, writeId); - if (succ) { + Map newParams = rawStore.updateTableColumnStatistics( + colStats, validWriteIds, writeId); + if (newParams != null) { String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(colStats.getStatsDesc().getCatName()) : getDefaultCatalog(conf); String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; + return newParams; } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); if (table == null) { // The table is not yet loaded in cache - return succ; + return newParams; } - List statsObjs = colStats.getStatsObj(); - List colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj : statsObjs) { - colNames.add(statsObj.getColName()); - } - StatsSetupConst.setColumnStatsState(table.getParameters(), colNames); + table.setParameters(newParams); sharedCache.alterTableInCache(catName, dbName, tblName, table); - sharedCache.updateTableColStatsInCache(catName, dbName, tblName, statsObjs); + sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj()); } - return succ; + return newParams; } @Override @@ -1634,29 +1703,29 @@ public ColumnStatistics getTableColumnStatistics(String catName, String dbName, return getTableColumnStatistics(catName, dbName, tblName, colNames, null); } - // TODO: the same as getTable() @Override public ColumnStatistics getTableColumnStatistics( String catName, String dbName, String tblName, List colNames, - String writeIdList) + String validWriteIds) throws MetaException, NoSuchObjectException { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getTableColumnStatistics( - catName, dbName, tblName, colNames, writeIdList); + catName, dbName, tblName, colNames, validWriteIds); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null || writeIdList != null) { + if (table == null) { // The table is not yet loaded in cache return rawStore.getTableColumnStatistics( - catName, dbName, tblName, colNames, writeIdList); + catName, dbName, tblName, colNames, validWriteIds); } ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); List colStatObjs = sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames); - return new ColumnStatistics(csd, colStatObjs); + return adjustColStatForGet(table.getParameters(), table.getParameters(), + new ColumnStatistics(csd, colStatObjs), table.getWriteId(), validWriteIds); } @Override @@ -1677,36 +1746,31 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partVals, - String validWriteIds, long writeId) + public Map updatePartitionColumnStatistics(ColumnStatistics colStats, + List partVals, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId); - if (succ) { + Map newParams = rawStore.updatePartitionColumnStatistics( + colStats, partVals, validWriteIds, writeId); + if (newParams != null) { String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; + return newParams; } - List statsObjs = colStats.getStatsObj(); Partition part = getPartition(catName, dbName, tblName, partVals); - List colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj : statsObjs) { - colNames.add(statsObj.getColName()); - } - StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + part.setParameters(newParams); sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part); sharedCache.updatePartitionColStatsInCache(catName, dbName, tblName, partVals, colStats.getStatsObj()); } - return succ; + return newParams; } @Override - // TODO: calculate from cached values. public List getPartitionColumnStatistics(String catName, String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException { - return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); + return getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, null); } @Override @@ -1714,6 +1778,8 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partNames, List colNames, String writeIdList) throws MetaException, NoSuchObjectException { + // TODO: why have updatePartitionColumnStatistics cache if this is a bypass? + // Note: when implemented, this needs to call adjustColStatForGet, like other get methods. return rawStore.getPartitionColumnStatistics( catName, dbName, tblName, partNames, colNames, writeIdList); } @@ -1743,7 +1809,6 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam } @Override - // TODO: the same as getTable() for transactional stats. public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List partNames, List colNames, String writeIdList) @@ -1752,16 +1817,19 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + // TODO: we currently cannot do transactional checks for stats here + // (incl. due to lack of sync w.r.t. the below rawStore call). + if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null) { rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null || writeIdList != null) { + if (table == null) { // The table is not yet loaded in cache return rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } + List allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); if (partNames.size() == allPartNames.size()) { colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALL); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 15b1aa168c..24f940ccbf 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -468,6 +468,16 @@ public void removeTableColStats(String colName) { } } + public void removeAllTableColStats() { + try { + tableLock.writeLock().lock(); + tableColStatsCache.clear(); + isTableColStatsCacheDirty.set(true); + } finally { + tableLock.writeLock().unlock(); + } + } + public ColumnStatisticsObj getPartitionColStats(List partVal, String colName) { try { tableLock.readLock().lock(); @@ -539,6 +549,20 @@ public void removePartitionColStats(List partVals, String colName) { } } + public void removeAllPartitionColStats() { + try { + tableLock.writeLock().lock(); + partitionColStatsCache.clear(); + isPartitionColStatsCacheDirty.set(true); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + public void refreshPartitionColStats(List partitionColStats) { Map newPartitionColStatsCache = new HashMap(); @@ -1292,6 +1316,18 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t } } + public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.removeAllTableColStats(); + } + } finally { + cacheLock.readLock().unlock(); + } + } + public void updateTableColStatsInCache(String catName, String dbName, String tableName, List colStatsForTable) { try { @@ -1505,6 +1541,18 @@ public void removePartitionColStatsFromCache(String catName, String dbName, Stri } } + public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.removeAllPartitionColStats(); + } + } finally { + cacheLock.readLock().unlock(); + } + } + public void updatePartitionColStatsInCache(String catName, String dbName, String tableName, List partVals, List colStatsObjs) { try { diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index fb14536f4b..09c2509b3d 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -292,10 +292,10 @@ public boolean dropPartition(String catName, String dbName, String tableName, Li } @Override - public void alterTable(String catName, String dbName, String name, Table newTable, + public Table alterTable(String catName, String dbName, String name, Table newTable, String queryValidWriteIds) throws InvalidObjectException, MetaException { - objectStore.alterTable(catName, dbName, name, newTable, queryValidWriteIds); + return objectStore.alterTable(catName, dbName, name, newTable, queryValidWriteIds); } @Override @@ -357,16 +357,16 @@ public PartitionValuesResponse listPartitionValues(String catName, String db_nam } @Override - public void alterPartition(String catName, String dbName, String tblName, List partVals, + public Partition alterPartition(String catName, String dbName, String tblName, List partVals, Partition newPart, String queryValidWriteIds) throws InvalidObjectException, MetaException { - objectStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryValidWriteIds); + return objectStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryValidWriteIds); } @Override - public void alterPartitions(String catName, String dbName, String tblName, + public List alterPartitions(String catName, String dbName, String tblName, List> partValsList, List newParts, long writeId, String queryValidWriteIds) throws InvalidObjectException, MetaException { - objectStore.alterPartitions( + return objectStore.alterPartitions( catName, dbName, tblName, partValsList, newParts, writeId, queryValidWriteIds); } @@ -694,14 +694,14 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St } @Override - public boolean updateTableColumnStatistics(ColumnStatistics statsObj, String validWriteIds, long writeId) + public Map updateTableColumnStatistics(ColumnStatistics statsObj, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { return objectStore.updateTableColumnStatistics(statsObj, validWriteIds, writeId); } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, + public Map updatePartitionColumnStatistics(ColumnStatistics statsObj, List partVals, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 52785a6889..3aebaf3419 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -290,8 +290,9 @@ public boolean dropPartition(String catName, String dbName, String tableName, Li } @Override - public void alterTable(String catName, String dbname, String name, Table newTable, String queryValidWriteIds) + public Table alterTable(String catName, String dbname, String name, Table newTable, String queryValidWriteIds) throws InvalidObjectException, MetaException { + return newTable; } @Override @@ -358,14 +359,16 @@ public PartitionValuesResponse listPartitionValues(String catName, String db_nam } @Override - public void alterPartition(String catName, String db_name, String tbl_name, List part_vals, + public Partition alterPartition(String catName, String db_name, String tbl_name, List part_vals, Partition new_part, String queryValidWriteIds) throws InvalidObjectException, MetaException { + return new_part; } @Override - public void alterPartitions(String catName, String db_name, String tbl_name, + public List alterPartitions(String catName, String db_name, String tbl_name, List> part_vals_list, List new_parts, long writeId, String queryValidWriteIds) throws InvalidObjectException, MetaException { + return new_parts; } @Override @@ -743,17 +746,17 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St } @Override - public boolean updateTableColumnStatistics(ColumnStatistics statsObj, + public Map updateTableColumnStatistics(ColumnStatistics statsObj, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException { - return false; + return null; } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj,List partVals, + public Map updatePartitionColumnStatistics(ColumnStatistics statsObj,List partVals, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException { - return false; + return null; } @Override