diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 580bae9c3f..7c38bf5bcd 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -421,8 +421,8 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { - return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames); + List partNames, long arg4, String arg5) throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames, arg4, arg5); } @Override @@ -1290,4 +1290,6 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { String dbName, String tableName) throws MetaException, NoSuchObjectException { return null; - }} + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index ddca70497a..b398327a81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.IsolationLevelCompliance; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; @@ -214,11 +215,21 @@ private void stopWorkers() { String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY); if ("true".equalsIgnoreCase(skipParam)) return null; - // TODO: when txn stats are implemented, use writeIds to determine stats accuracy - @SuppressWarnings("unused") - ValidReaderWriteIdList writeIds = null; + // Note: ideally we should take a lock here to pretend to be a real reader. + // For now, this check is going to have race potential; it may run a spurious analyze. + String writeIdString = null; if (AcidUtils.isTransactionalTable(table)) { - writeIds = getWriteIds(fullTableName); + ValidReaderWriteIdList writeIds = getWriteIds(fullTableName); + if (writeIds == null) { + LOG.error("Cannot get writeIds for transactional table " + fullTableName + "; skipping"); + return null; + } + writeIdString = writeIds.writeToString(); + if (writeIdString != null) { + // Get the table again, this time with write IDs, just so we can check the stats state. + // Note: we should just be table to get write IDs, like params, as part of getTable? + table = rs.getTable(cat, db, tbl, 0, writeIdString); + } } List allCols = new ArrayList<>(table.getSd().getColsSize()); for (FieldSchema fs : table.getSd().getCols()) { @@ -227,9 +238,15 @@ private void stopWorkers() { Collections.sort(allCols); if (table.getPartitionKeysSize() == 0) { Map params = table.getParameters(); - List colsToUpdate = isExistingOnly - ? getExistingNonPartTableStatsToUpdate(fullTableName, cat, db, tbl, params, allCols) - : getAnyStatsToUpdate(allCols, params); + List colsToUpdate = null; + if (isExistingOnly) { + // Get the existing stats, including the txn state if any, to see if we need to update. + colsToUpdate = getExistingNonPartTableStatsToUpdate( + fullTableName, cat, db, tbl, params, allCols, writeIdString); + } else { + colsToUpdate = getAnyStatsToUpdate(allCols, params); + } + LOG.debug("Columns to update are {}; existing only: {}, out of: {} based on {}", colsToUpdate, isExistingOnly, allCols, params); @@ -241,7 +258,7 @@ private void stopWorkers() { } else { Map> partsToAnalyze = new HashMap<>(); List colsForAllParts = findPartitionsToAnalyze( - fullTableName, cat, db, tbl, allCols, partsToAnalyze); + fullTableName, cat, db, tbl, allCols, partsToAnalyze, writeIdString); LOG.debug("Columns to update are {} for all partitions; {} individual partitions." + " Existing only: {}, out of: {}", colsForAllParts, partsToAnalyze.size(), isExistingOnly, allCols); @@ -263,18 +280,29 @@ private void stopWorkers() { } private List findPartitionsToAnalyze(TableName fullTableName, String cat, String db, - String tbl, List allCols, Map> partsToAnalyze) - throws MetaException, NoSuchObjectException { + String tbl, List allCols, Map> partsToAnalyze, + String writeIdString) throws MetaException, NoSuchObjectException { // TODO: ideally when col-stats-accurate stuff is stored in some sane structure, this should - // to retrieve partsToUpdate in a single query; no checking partition params in java. + // retrieve partsToUpdate in a single query; no checking partition params in java. List partNames = null; Map> colsPerPartition = null; boolean isAllParts = true; if (isExistingOnly) { - colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl); - partNames = Lists.newArrayList(colsPerPartition.keySet()); - int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, ""); - isAllParts = partitionCount == partNames.size(); + rs.openTransaction(); + boolean isOk = false; + try { + colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl); + partNames = Lists.newArrayList(colsPerPartition.keySet()); + int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, ""); + isAllParts = partitionCount == partNames.size(); + isOk = true; + } finally { + if (isOk) { + rs.commitTransaction(); + } else { + rs.rollbackTransaction(); + } + } } else { partNames = rs.listPartitionNames(cat, db, tbl, (short) -1); isAllParts = true; @@ -293,7 +321,7 @@ private void stopWorkers() { currentBatchStart = nextBatchStart; nextBatchStart = nextBatchEnd; try { - currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames); + currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames, 0, writeIdString); } catch (NoSuchObjectException e) { LOG.error("Failed to get partitions for " + fullTableName + ", skipping some partitions", e); currentBatch = null; @@ -327,8 +355,8 @@ private void stopWorkers() { Collections.sort(colsToMaybeUpdate); } List colsToUpdate = getAnyStatsToUpdate(colsToMaybeUpdate, params); - LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params); + LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params); if (colsToUpdate == null || colsToUpdate.isEmpty()) { if (isAllParts) { @@ -406,10 +434,10 @@ private String buildPartColStr(Table table) { private List getExistingNonPartTableStatsToUpdate(TableName fullTableName, String cat, String db, String tbl, Map params, - List allCols) throws MetaException { + List allCols, String writeIdString) throws MetaException { ColumnStatistics existingStats = null; try { - existingStats = rs.getTableColumnStatistics(cat, db, tbl, allCols); + existingStats = rs.getTableColumnStatistics(cat, db, tbl, allCols, 0, writeIdString); } catch (NoSuchObjectException e) { LOG.error("Cannot retrieve existing stats, skipping " + fullTableName, e); return null; @@ -419,7 +447,8 @@ private String buildPartColStr(Table table) { private List getExistingStatsToUpdate( ColumnStatistics existingStats, Map params) { - boolean hasAnyAccurate = StatsSetupConst.areBasicStatsUptoDate(params); + boolean hasAnyAccurate = existingStats.getIsStatsCompliant() == IsolationLevelCompliance.YES + && StatsSetupConst.areBasicStatsUptoDate(params); List colsToUpdate = new ArrayList<>(); for (ColumnStatisticsObj obj : existingStats.getStatsObj()) { String col = obj.getColName(); @@ -430,8 +459,7 @@ private String buildPartColStr(Table table) { return colsToUpdate; } - private List getAnyStatsToUpdate( - List allCols, Map params) { + private List getAnyStatsToUpdate(List allCols, Map params) { // Note: we only run "for columns" command and assume no basic stats means no col stats. if (!StatsSetupConst.areBasicStatsUptoDate(params)) { return allCols; @@ -458,8 +486,9 @@ private String buildPartColStr(Table table) { private ValidReaderWriteIdList getWriteIds( TableName fullTableName) throws NoSuchTxnException, MetaException { - GetValidWriteIdsRequest req = new GetValidWriteIdsRequest(); - req.setFullTableNames(Lists.newArrayList(fullTableName.toString())); + // TODO: acid utils don't support catalogs + GetValidWriteIdsRequest req = new GetValidWriteIdsRequest( + Lists.newArrayList(fullTableName.getDbTable()), null); return TxnUtils.createValidReaderWriteIdList( txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index dd0929f2b9..552b1d1355 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -123,7 +123,7 @@ protected Partition resolvePartition(CompactionInfo ci) throws Exception { List parts; try { parts = rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, - Collections.singletonList(ci.partName)); + Collections.singletonList(ci.partName), -1, null); if (parts == null || parts.size() == 0) { // The partition got dropped before we went looking for it. return null; diff --git ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index 14f86eabbc..9fbda3894e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -29,9 +29,11 @@ import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -141,6 +143,92 @@ public void testMultipleTables() throws Exception { } @Test(timeout=40000) + public void testTxnTable() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (s string) TBLPROPERTIES " + + "(\"transactional\"=\"true\", \"transactional_properties\"=\"insert_only\")"); + executeQuery("insert into simple_stats (s) values ('test')"); + ValidWriteIdList initialWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, initialWriteIds.toString(), true); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + + executeQuery("insert overwrite table simple_stats values ('test2')"); + ValidWriteIdList nextWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, nextWriteIds.toString(), true); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + + // Overwrite the txn state to refer to a previous txn. + Table tbl = msClient.getTable(ss.getCurrentDatabase(), "simple_stats"); + tbl.setValidWriteIdList(initialWriteIds.toString()); + msClient.alter_table(ss.getCurrentDatabase(), "simple_stats", tbl); + String currentWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats").toString(); + + // Stats should not be valid. + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, currentWriteIds, false); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su); + + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, currentWriteIds, true); + + msClient.close(); + } + + + @Test(timeout=40000) + public void testTxnPartitions() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (s string) partitioned by (p int) TBLPROPERTIES " + + "(\"transactional\"=\"true\", \"transactional_properties\"=\"insert_only\")"); + executeQuery("insert into simple_stats partition(p=1) values ('test')"); +// executeQuery("insert into simple_stats partition(p=2) values ('test2')"); +// executeQuery("insert into simple_stats partition(p=3) values ('test3')"); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + + ValidWriteIdList initialWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + executeQuery("insert overwrite table simple_stats partition(p=1) values ('test2')"); +// executeQuery("insert overwrite table simple_stats partition(p=2) values ('test3')"); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + ValidWriteIdList currentWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + + // Overwrite the txn state to refer to a previous txn on 2/3 partitions. + Partition part = msClient.getPartition(ss.getCurrentDatabase(), "simple_stats", "p=1"); + part.setValidWriteIdList(initialWriteIds.toString()); + msClient.alter_partition(ss.getCurrentDatabase(), "simple_stats", part); +// part = msClient.getPartition(ss.getCurrentDatabase(), "simple_stats", "p=2"); +// part.setValidWriteIdList(initialWriteIds.toString()); +// msClient.alter_partition(ss.getCurrentDatabase(), "simple_stats", part); + + // We expect two partitions to be updated. + Map> stats = msClient.getPartitionColumnStatistics( + ss.getCurrentDatabase(), "simple_stats", Lists.newArrayList("p=1", "p=2", "p=3"), + Lists.newArrayList("s"), 0, currentWriteIds.toString()); + + assertTrue(stats.isEmpty()); + assertTrue(su.runOneIteration()); + drainWorkQueue(su, 1); + + msClient.close(); + } + + @Test(timeout=40000) public void testExistingOnly() throws Exception { hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing"); StatsUpdaterThread su = createUpdater(); @@ -443,6 +531,12 @@ private void verifyStatsUpToDate(String tbl, ArrayList cols, IMetaStoreC verifyStatsUpToDate(table.getParameters(), cols, isUpToDate); } + private void verifyStatsUpToDate(String tbl, ArrayList cols, IMetaStoreClient msClient, + long txnId, String validWriteIds, boolean isUpToDate) throws Exception { + Table table = msClient.getTable(ss.getCurrentDatabase(), tbl, txnId, validWriteIds); + verifyStatsUpToDate(table.getParameters(), cols, isUpToDate); + } + private void verifyStatsUpToDate(Map params, ArrayList cols, boolean isUpToDate) { if (isUpToDate) { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 4296084381..15469a243f 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2729,7 +2729,8 @@ private void alterTableStatsForTruncate(final RawStore ms, alterHandler.alterTable(ms, wh, catName, dbName, tableName, table, environmentContext, this); } } else { - for (Partition partition : ms.getPartitionsByNames(catName, dbName, tableName, partNames)) { + for (Partition partition : ms.getPartitionsByNames( + catName, dbName, tableName, partNames, -1, null)) { alterPartitionForTruncate(ms, catName, dbName, tableName, table, partition); } } @@ -2752,7 +2753,8 @@ private void alterTableStatsForTruncate(final RawStore ms, locations.add(new Path(table.getSd().getLocation())); } } else { - for (Partition partition : ms.getPartitionsByNames(catName, dbName, tableName, partNames)) { + for (Partition partition : ms.getPartitionsByNames( + catName, dbName, tableName, partNames, -1, null)) { locations.add(new Path(partition.getSd().getLocation())); } } @@ -4280,7 +4282,7 @@ public DropPartitionsResult drop_partitions_req( } else if (spec.isSetNames()) { partNames = spec.getNames(); minCount = partNames.size(); - parts = ms.getPartitionsByNames(catName, dbName, tblName, partNames); + parts = ms.getPartitionsByNames(catName, dbName, tblName, partNames, -1, null); } else { throw new MetaException("Partition spec is not set"); } @@ -5682,11 +5684,20 @@ public PartitionsStatsResult get_partitions_statistics_req(PartitionsStatsReques lowerCasePartNames.add(lowerCaseConvertPartName(partName)); } try { - List stats = getMS().getPartitionColumnStatistics( - catName, dbName, tblName, lowerCasePartNames, lowerCaseColNames); + List stats = null; + if (request.isSetTxnId()) { + stats = getMS().getPartitionColumnStatistics( + catName, dbName, tblName, lowerCasePartNames, lowerCaseColNames, + request.getTxnId(), request.getValidWriteIdList()); + } else { + stats = getMS().getPartitionColumnStatistics( + catName, dbName, tblName, lowerCasePartNames, lowerCaseColNames); + } Map> map = new HashMap<>(); - for (ColumnStatistics stat : stats) { - map.put(stat.getStatsDesc().getPartName(), stat.getStatsObj()); + if (stats != null) { + for (ColumnStatistics stat : stats) { + map.put(stat.getStatsDesc().getPartName(), stat.getStatsObj()); + } } result = new PartitionsStatsResult(map); } finally { @@ -5987,7 +5998,7 @@ private int get_num_partitions_by_expr(final String catName, final String dbName Exception ex = null; try { ret = getMS().getPartitionsByNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, - partNames); + partNames, -1, null); } catch (Exception e) { ex = e; rethrowException(e); @@ -7422,7 +7433,8 @@ public boolean set_aggr_stats_for(SetPartitionsStatsRequest request) throws TExc oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld); } // another single call to get all the partition objects - partitions = getMS().getPartitionsByNames(catName, dbName, tableName, partitionNames); + partitions = getMS().getPartitionsByNames( + catName, dbName, tableName, partitionNames, -1, null); for (int index = 0; index < partitionNames.size(); index++) { mapToPart.put(partitionNames.get(index), partitions.get(index)); } @@ -7707,7 +7719,8 @@ public CacheFileMetadataResult cache_file_metadata( int currentBatchSize = Math.min(batchSize, partNames.size() - index); List nameBatch = partNames.subList(index, index + currentBatchSize); index += currentBatchSize; - List parts = ms.getPartitionsByNames(DEFAULT_CATALOG_NAME, dbName, tblName, nameBatch); + List parts = ms.getPartitionsByNames( + DEFAULT_CATALOG_NAME, dbName, tblName, nameBatch, -1, null); for (Partition part : parts) { if (!part.isSetSd() || !part.getSd().isSetLocation()) { throw new MetaException("Partition does not have storage location;" + diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 51e081b22f..fe1c65ebb1 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -49,6 +49,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.ObjectStore.TxnStatsCtx; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; @@ -112,6 +113,7 @@ private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class); private final PersistenceManager pm; + private final Configuration conf; private final String schema; /** @@ -146,8 +148,10 @@ SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS; + public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { this.pm = pm; + this.conf = conf; this.schema = schema; DatabaseProduct dbType = null; try { @@ -454,9 +458,8 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ * @param partNames Partition names to get. * @return List of partitions. */ - public List getPartitionsViaSqlFilter(final String catName, final String dbName, - final String tblName, List partNames) - throws MetaException { + public List getPartitionsViaSqlFilter(String catName, String dbName, String tblName, + List partNames, TxnStatsCtx tsCtx) throws MetaException { if (partNames.isEmpty()) { return Collections.emptyList(); } @@ -469,7 +472,7 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ if (partitionIds.isEmpty()) { return Collections.emptyList(); // no partitions, bail early. } - return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds); + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds, tsCtx); } }); } @@ -495,7 +498,7 @@ public Database getDatabase(String catName, String dbName) throws MetaException{ @Override public List run(List input) throws MetaException { return getPartitionsFromPartitionIds(catName, filter.table.getDbName(), - filter.table.getTableName(), isViewTable, input); + filter.table.getTableName(), isViewTable, input, null); } }); } @@ -537,7 +540,7 @@ public boolean generateSqlFilterForPushdown( List result = Batchable.runBatched(batchSize, partitionIds, new Batchable() { @Override public List run(List input) throws MetaException { - return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input); + return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input, null); } }); return result; @@ -629,8 +632,9 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw } /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List getPartitionsFromPartitionIds(String catName, String dbName, String tblName, - Boolean isView, List partIdList) throws MetaException { + private List getPartitionsFromPartitionIds(String catName, String dbName, + String tblName, Boolean isView, List partIdList, TxnStatsCtx tsCtx) + throws MetaException { boolean doTrace = LOG.isDebugEnabled(); int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma @@ -645,8 +649,11 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\"," + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\"," - + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\" " - + "from " + PARTITIONS + "" + + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\""; + if (tsCtx != null) { + queryText += ", " + PARTITIONS + ".\"TXN_ID\", " + PARTITIONS + ".\"WRITEID_LIST\""; + } + queryText = queryText + " from " + PARTITIONS + "" + " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" " + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc"; @@ -747,6 +754,11 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw serde.setSerializationLib((String)fields[13]); serdeSb.append(serdeId).append(","); sd.setSerdeInfo(serde); + if (tsCtx != null) { + // We will store these in the partition temporarily, then clear them after updating params. + part.setTxnId(extractSqlLong(fields[14])); + part.setValidWriteIdList((String)fields[15]); + } Deadline.checkTimeout(); } query.closeAll(); @@ -775,6 +787,15 @@ public void apply(Partition t, Object[] fields) { t.addToValues((String)fields[1]); }}); + if (tsCtx != null) { + for (Partition part : partitions.values()) { + ObjectStore.setTxnPartitionStatsParam(conf, part, part.getTxnId(), + part.getValidWriteIdList(), tsCtx.queryTxnId, tsCtx.queryWriteIdList); + part.unsetTxnId(); + part.unsetValidWriteIdList(); + } + } + // Prepare IN (blah) lists for the following queries. Cut off the final ','s. if (sdSb.length() == 0) { assert serdeSb.length() == 0 && colsSb.length() == 0; diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 9266879ad0..245a4c2349 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2451,21 +2451,32 @@ public Partition getPartition(String catName, String dbName, String tableName, + part_vals.toString()); } part.setValues(part_vals); + setPartitionStatsParam(conf, part, table.getParameters(), + mpart.getTxnId(), mpart.getWriteIdList(), txnId, writeIdList); + return part; + } + + public static void setPartitionStatsParam(Configuration conf, Partition part, + Map tableParams, long partTxnId, String partWriteIdList, + long reqTxnId, String reqWriteIdList) throws MetaException { // If transactional table partition, check whether the current version partition // statistics in the metastore comply with the client query's snapshot isolation. - if (writeIdList != null) { - if (TxnUtils.isTransactionalTable(table.getParameters())) { - if (isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, -1, false)) { - part.setIsStatsCompliant(IsolationLevelCompliance.YES); - } else { - part.setIsStatsCompliant(IsolationLevelCompliance.NO); - // Do not make persistent the following state since it is query specific (not global). - StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); - } - } + if (reqWriteIdList == null) return; + if (!TxnUtils.isTransactionalTable(tableParams)) return; + setTxnPartitionStatsParam(conf, part, partTxnId, partWriteIdList, reqTxnId, reqWriteIdList); + } + + public static void setTxnPartitionStatsParam(Configuration conf, Partition part, long partTxnId, + String partWriteIdList, long reqTxnId, String reqWriteIdList) throws MetaException { + if (isCurrentStatsValidForTheQuery(conf, partTxnId, part.getParameters(), partWriteIdList, + reqTxnId, reqWriteIdList, -1, false)) { + part.setIsStatsCompliant(IsolationLevelCompliance.YES); + } else { + part.setIsStatsCompliant(IsolationLevelCompliance.NO); + // Do not make persistent the following state since it is query specific (not global). + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + LOG.debug("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); } - return part; } /** @@ -2917,11 +2928,16 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN return dest; } - private List convertToParts(String catName, String dbName, String tblName, List mparts) - throws MetaException { + private List convertToParts(String catName, String dbName, String tblName, + List mparts, TxnStatsCtx tsCtx) throws MetaException { List parts = new ArrayList<>(mparts.size()); for (MPartition mp : mparts) { - parts.add(convertToPart(catName, dbName, tblName, mp)); + Partition part = convertToPart(catName, dbName, tblName, mp); + if (tsCtx != null) { + setTxnPartitionStatsParam(conf, part, mp.getTxnId(), mp.getWriteIdList(), + tsCtx.queryTxnId, tsCtx.queryWriteIdList); + } + parts.add(part); Deadline.checkTimeout(); } return parts; @@ -3351,26 +3367,48 @@ private Collection getPartitionPsQueryResults(String catName, String dbName, Str @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { - return getPartitionsByNamesInternal(catName, dbName, tblName, partNames, true, true); + List partNames, long txnId, String writeIdList) + throws MetaException, NoSuchObjectException { + return getPartitionsByNamesInternal( + catName, dbName, tblName, partNames, txnId, writeIdList, true, true); + } + + + public static final class TxnStatsCtx { + final long queryTxnId; + final String queryWriteIdList; + + private TxnStatsCtx(long queryTxnId, String queryWriteIdList) { + this.queryTxnId = queryTxnId; + this.queryWriteIdList = queryWriteIdList; + } + + public static TxnStatsCtx create(Table tbl, long queryTxnId, String queryWriteIdList) { + if (queryWriteIdList == null) return null; + if (tbl == null) { + throw new AssertionError("Cannot verify stats correctness without a table!"); + } + if (!TxnUtils.isTransactionalTable(tbl)) return null; + return new TxnStatsCtx(queryTxnId, queryWriteIdList); + } } protected List getPartitionsByNamesInternal(String catName, String dbName, - String tblName, - final List partNames, - boolean allowSql, boolean allowJdo) - throws MetaException, NoSuchObjectException { + String tblName, final List partNames, long txnId, String writeIdList, + boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { return new GetListHelper(catName, dbName, tblName, allowSql, allowJdo) { @Override protected List getSqlResult(GetHelper> ctx) throws MetaException { - return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, partNames); + return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, partNames, + TxnStatsCtx.create(ctx.getTable(), txnId, writeIdList)); } @Override protected List getJdoResult( GetHelper> ctx) throws MetaException, NoSuchObjectException { - return getPartitionsViaOrmFilter(catName, dbName, tblName, partNames); + return getPartitionsViaOrmFilter(catName, dbName, tblName, partNames, + TxnStatsCtx.create(ctx.getTable(), txnId, writeIdList)); } - }.run(false); + }.run(writeIdList != null); // Table is only needed for txn stats verification. } @Override @@ -3401,7 +3439,7 @@ protected boolean getPartitionsByExprInternal(String catName, String dbName, Str List partNames = new LinkedList<>(); hasUnknownPartitions.set(getPartitionNamesPrunedByExprNoTxn( ctx.getTable(), expr, defaultPartitionName, maxParts, partNames)); - return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, partNames); + return directSql.getPartitionsViaSqlFilter(catName, dbName, tblName, partNames, null); } @Override @@ -3417,7 +3455,7 @@ protected boolean getPartitionsByExprInternal(String catName, String dbName, Str List partNames = new ArrayList<>(); hasUnknownPartitions.set(getPartitionNamesPrunedByExprNoTxn( ctx.getTable(), expr, defaultPartitionName, maxParts, partNames)); - result = getPartitionsViaOrmFilter(catName, dbName, tblName, partNames); + result = getPartitionsViaOrmFilter(catName, dbName, tblName, partNames, null); } return result; } @@ -3513,8 +3551,8 @@ private Integer getNumPartitionsViaOrmFilter(Table table, ExpressionTree tree, b * @param partNames Partition names to get the objects for. * @return Resulting partitions. */ - private List getPartitionsViaOrmFilter(String catName, - String dbName, String tblName, List partNames) throws MetaException { + private List getPartitionsViaOrmFilter(String catName, String dbName, String tblName, + List partNames, TxnStatsCtx tsCtx) throws MetaException { if (partNames.isEmpty()) { return new ArrayList<>(); } @@ -3526,7 +3564,7 @@ private Integer getNumPartitionsViaOrmFilter(Table table, ExpressionTree tree, b query.setOrdering("partitionName ascending"); @SuppressWarnings("unchecked") List mparts = (List)query.executeWithMap(queryWithParams.getSecond()); - List partitions = convertToParts(catName, dbName, tblName, mparts); + List partitions = convertToParts(catName, dbName, tblName, mparts, tsCtx); if (query != null) { query.closeAll(); } @@ -4146,6 +4184,9 @@ public void alterTable(String catName, String dbname, String name, Table newTabl LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + dbname + "." + name + ". will be made persistent."); } + if (LOG.isTraceEnabled()) { + LOG.trace("Setting writeIdList " + newTable.getValidWriteIdList()); + } oldt.setTxnId(newTable.getTxnId()); oldt.setWriteIdList(newTable.getValidWriteIdList()); } @@ -4231,7 +4272,7 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str TxnUtils.isTransactionalTable(table.getParameters())) { // Check concurrent INSERT case and set false to the flag. if (!isCurrentStatsValidForTheQuery(oldp, newp.getTxnId(), newp.getWriteIdList(), - -1, true)) { + -1, false)) { StatsSetupConst.setBasicStatsState(oldp.getParameters(), StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + dbname + "." + name + "." + oldp.getPartitionName() + " will be made persistent."); @@ -4246,14 +4287,15 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str public void alterPartition(String catName, String dbname, String name, List part_vals, Partition newPart) throws InvalidObjectException, MetaException { boolean success = false; - Exception e = null; + Throwable e = null; try { openTransaction(); MColumnDescriptor oldCd = alterPartitionNoTxn(catName, dbname, name, part_vals, newPart); removeUnusedColumnDescriptor(oldCd); // commit the changes success = commitTransaction(); - } catch (Exception exception) { + } catch (Throwable exception) { + LOG.error("alterPartition failed", exception); e = exception; } finally { if (!success) { @@ -8650,15 +8692,29 @@ protected ColumnStatistics getJdoResult( } // Loop through the given "partNames" list // checking isolation-level-compliance of each partition column stats. - for(String partName : partNames) { - MPartition mpart = getMPartition(catName, dbName, tableName, Warehouse.getPartValuesFromPartName(partName)); - if (!isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, -1, false)) { - LOG.debug("The current metastore transactional partition column statistics " + - "for " + dbName + "." + tableName + "." + mpart.getPartitionName() + " is not valid " + - "for the current query."); - return null; + // We only return the valid partitions. + List newPartNames = partNames; // Optimistically assume the list will stay as is. + for (int i = 0; i < partNames.size(); ++i) { + String partName = partNames.get(i); + MPartition mpart = getMPartition( + catName, dbName, tableName, Warehouse.getPartValuesFromPartName(partName)); + if (mpart == null + || !isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, -1, false)) { + if (mpart != null) { + LOG.debug("The current metastore transactional partition column statistics " + + "for " + dbName + "." + tableName + "." + mpart.getPartitionName() + + " is not valid for the current query."); + } + if (partNames == newPartNames) { + // Create a new list if we need to make changes. + newPartNames = new ArrayList(partNames.size()); + newPartNames.addAll(partNames.subList(0, i)); + } + } else if (partNames != newPartNames) { + newPartNames.add(partName); } } + partNames = newPartNames; } return getPartitionColumnStatisticsInternal( catName, dbName, tableName, partNames, colNames, true, true); @@ -12250,7 +12306,7 @@ private boolean isCurrentStatsValidForTheQuery( MTable tbl, long txnId, String queryValidWriteIdList, long statsWriteId, boolean checkConcurrentWrites) throws MetaException { - return isCurrentStatsValidForTheQuery(tbl.getTxnId(), tbl.getParameters(), tbl.getWriteIdList(), + return isCurrentStatsValidForTheQuery(conf, tbl.getTxnId(), tbl.getParameters(), tbl.getWriteIdList(), txnId, queryValidWriteIdList, statsWriteId, checkConcurrentWrites); } @@ -12272,15 +12328,18 @@ private boolean isCurrentStatsValidForTheQuery( MPartition part, long txnId, String queryValidWriteIdList, long statsWriteId, boolean checkConcurrentWrites) throws MetaException { - return isCurrentStatsValidForTheQuery(part.getTxnId(), part.getParameters(), part.getWriteIdList(), + return isCurrentStatsValidForTheQuery(conf, part.getTxnId(), part.getParameters(), part.getWriteIdList(), txnId, queryValidWriteIdList, statsWriteId, checkConcurrentWrites); } - private boolean isCurrentStatsValidForTheQuery( - long statsTxnId, Map statsParams, String statsWriteIdList, - long queryTxnId, String queryValidWriteIdList, - long statsWriteId, boolean checkConcurrentWrites) + private static boolean isCurrentStatsValidForTheQuery(Configuration conf, long statsTxnId, + Map statsParams, String statsWriteIdList, long queryTxnId, + String queryValidWriteIdList, long statsWriteId, boolean checkConcurrentWrites) throws MetaException { + if (LOG.isTraceEnabled()) { + LOG.trace(statsTxnId + " " + statsWriteIdList + " vs " + queryTxnId + " " + queryValidWriteIdList + + "; " + statsParams + " => " + StatsSetupConst.areBasicStatsUptoDate(statsParams)); + } // if statsWriteIdList is null, // return true since the stats does not seem to be transactional. if (statsWriteIdList == null) { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index 8cc9d2c586..674b183031 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -593,7 +593,7 @@ int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] * @throws NoSuchObjectException No such table. */ List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) + List partNames, long txnId, String writeIdList) throws MetaException, NoSuchObjectException; Table markPartitionForEvent(String catName, String dbName, String tblName, Map partVals, PartitionEventType evtType) throws MetaException, UnknownTableException, InvalidPartitionException, UnknownPartitionException; diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index e4894fa12b..c880d48131 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -1272,17 +1272,17 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { + List partNames, long txnId, String writeIdList) throws MetaException, NoSuchObjectException { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { - return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); + return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames, txnId, writeIdList); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); if (table == null) { // The table is not yet loaded in cache - return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); + return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames, txnId, writeIdList); } List partitions = new ArrayList<>(); for (String partName : partNames) { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 001c3edcff..51f20a2ff3 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -389,8 +389,8 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { - return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames); + List partNames, long txnId, String writeIdList) throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames, txnId, writeIdList); } @Override diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index d6a882e8e9..aed4c37113 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -379,8 +379,8 @@ public void alterPartitions(String catName, String db_name, String tbl_name, @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { - + List partNames, long txnId, String writeIdList) + throws MetaException, NoSuchObjectException { return Collections.emptyList(); } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java index c9a6a471cb..09e630ab2f 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java @@ -64,11 +64,12 @@ public VerifyingObjectStore() { @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { + List partNames, long txnId, String writeIdList) + throws MetaException, NoSuchObjectException { List sqlResults = getPartitionsByNamesInternal( - catName, dbName, tblName, partNames, true, false); + catName, dbName, tblName, partNames, txnId, writeIdList, true, false); List ormResults = getPartitionsByNamesInternal( - catName, dbName, tblName, partNames, false, true); + catName, dbName, tblName, partNames, txnId, writeIdList, false, true); verifyLists(sqlResults, ormResults, Partition.class); return sqlResults; }