diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index cd036e6563..8f4430568b 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -422,8 +422,10 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, @Override public List getPartitionsByNames(String catName, String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { - return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames); + List partNames) + throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByNames( + catName, dbName, tblName, partNames); } @Override @@ -1305,4 +1307,6 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { String dbName, String tableName) throws MetaException, NoSuchObjectException { return null; - }} + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index c757718e56..1d0af5a420 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2516,7 +2516,7 @@ public Partition createPartition(Table tbl, Map partSpec) throws out.add(new Partition(tbl, outPart)); } getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), - partsToAlter, new EnvironmentContext()); + partsToAlter, new EnvironmentContext(), -1, null, -1); for ( org.apache.hadoop.hive.metastore.api.Partition outPart : getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){ diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index bb181a192a..221c483076 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; import org.apache.hadoop.hive.metastore.Warehouse; @@ -214,11 +215,17 @@ private void stopWorkers() { String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY); if ("true".equalsIgnoreCase(skipParam)) return null; - // TODO: when txn stats are implemented, use writeIds to determine stats accuracy - @SuppressWarnings("unused") - ValidReaderWriteIdList writeIds = null; - if (AcidUtils.isTransactionalTable(table)) { - writeIds = getWriteIds(fullTableName); + // Note: ideally we should take a lock here to pretend to be a real reader. + // For now, this check is going to have race potential; it may run a spurious analyze. + String writeIdString = null; + boolean isTxn = AcidUtils.isTransactionalTable(table); + if (isTxn) { + ValidReaderWriteIdList writeIds = getWriteIds(fullTableName); + if (writeIds == null) { + LOG.error("Cannot get writeIds for transactional table " + fullTableName + "; skipping"); + return null; + } + writeIdString = writeIds.writeToString(); } List allCols = new ArrayList<>(table.getSd().getColsSize()); for (FieldSchema fs : table.getSd().getCols()) { @@ -227,9 +234,16 @@ private void stopWorkers() { Collections.sort(allCols); if (table.getPartitionKeysSize() == 0) { Map params = table.getParameters(); - List colsToUpdate = isExistingOnly - ? getExistingNonPartTableStatsToUpdate(fullTableName, cat, db, tbl, params, allCols) - : getAnyStatsToUpdate(allCols, params); + List colsToUpdate = null; + long writeId = isTxn ? table.getWriteId() : -1; + if (isExistingOnly) { + // Get the existing stats, including the txn state if any, to see if we need to update. + colsToUpdate = getExistingNonPartTableStatsToUpdate( + fullTableName, cat, db, tbl, params, writeId, allCols, writeIdString); + } else { + colsToUpdate = getAnyStatsToUpdate(db, tbl, allCols, params, writeId, writeIdString); + } + LOG.debug("Columns to update are {}; existing only: {}, out of: {} based on {}", colsToUpdate, isExistingOnly, allCols, params); @@ -241,7 +255,7 @@ private void stopWorkers() { } else { Map> partsToAnalyze = new HashMap<>(); List colsForAllParts = findPartitionsToAnalyze( - fullTableName, cat, db, tbl, allCols, partsToAnalyze); + fullTableName, cat, db, tbl, allCols, partsToAnalyze, writeIdString); LOG.debug("Columns to update are {} for all partitions; {} individual partitions." + " Existing only: {}, out of: {}", colsForAllParts, partsToAnalyze.size(), isExistingOnly, allCols); @@ -263,18 +277,30 @@ private void stopWorkers() { } private List findPartitionsToAnalyze(TableName fullTableName, String cat, String db, - String tbl, List allCols, Map> partsToAnalyze) - throws MetaException, NoSuchObjectException { + String tbl, List allCols, Map> partsToAnalyze, + String writeIdString) throws MetaException, NoSuchObjectException { // TODO: ideally when col-stats-accurate stuff is stored in some sane structure, this should - // to retrieve partsToUpdate in a single query; no checking partition params in java. + // retrieve partsToUpdate in a single query; no checking partition params in java. List partNames = null; Map> colsPerPartition = null; boolean isAllParts = true; if (isExistingOnly) { - colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl); - partNames = Lists.newArrayList(colsPerPartition.keySet()); - int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, ""); - isAllParts = partitionCount == partNames.size(); + // Make sure the number of partitions we get, and the number of stats objects, is consistent. + rs.openTransaction(); + boolean isOk = false; + try { + colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl); + partNames = Lists.newArrayList(colsPerPartition.keySet()); + int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, ""); + isAllParts = partitionCount == partNames.size(); + isOk = true; + } finally { + if (isOk) { + rs.commitTransaction(); + } else { + rs.rollbackTransaction(); + } + } } else { partNames = rs.listPartitionNames(cat, db, tbl, (short) -1); isAllParts = true; @@ -326,9 +352,10 @@ private void stopWorkers() { colsToMaybeUpdate = colsPerPartition.get(partName); Collections.sort(colsToMaybeUpdate); } - List colsToUpdate = getAnyStatsToUpdate(colsToMaybeUpdate, params); - LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params); + List colsToUpdate = getAnyStatsToUpdate(db, tbl, colsToMaybeUpdate, params, + writeIdString == null ? -1 : part.getWriteId(), writeIdString); + LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params); if (colsToUpdate == null || colsToUpdate.isEmpty()) { if (isAllParts) { @@ -405,8 +432,8 @@ private String buildPartColStr(Table table) { } private List getExistingNonPartTableStatsToUpdate(TableName fullTableName, - String cat, String db, String tbl, Map params, - List allCols) throws MetaException { + String cat, String db, String tbl, Map params, long statsWriteId, + List allCols, String writeIdString) throws MetaException { ColumnStatistics existingStats = null; try { // Note: this should NOT do txn verification - we want to get outdated stats, to @@ -416,12 +443,15 @@ private String buildPartColStr(Table table) { LOG.error("Cannot retrieve existing stats, skipping " + fullTableName, e); return null; } - return getExistingStatsToUpdate(existingStats, params); + // TODO: we should probably skip updating if writeId is from an active txn + boolean isTxnValid = (writeIdString == null) || ObjectStore.isCurrentStatsValidForTheQuery( + conf, db, tbl, params, statsWriteId , 0, writeIdString); + return getExistingStatsToUpdate(existingStats, params, isTxnValid); } private List getExistingStatsToUpdate( - ColumnStatistics existingStats, Map params) { - boolean hasAnyAccurate = StatsSetupConst.areBasicStatsUptoDate(params); + ColumnStatistics existingStats, Map params, boolean isTxnValid) { + boolean hasAnyAccurate = isTxnValid && StatsSetupConst.areBasicStatsUptoDate(params); List colsToUpdate = new ArrayList<>(); for (ColumnStatisticsObj obj : existingStats.getStatsObj()) { String col = obj.getColName(); @@ -432,12 +462,17 @@ private String buildPartColStr(Table table) { return colsToUpdate; } - private List getAnyStatsToUpdate( - List allCols, Map params) { + private List getAnyStatsToUpdate(String db, String tbl, List allCols, + Map params, long statsWriteId, String writeIdString) throws MetaException { // Note: we only run "for columns" command and assume no basic stats means no col stats. if (!StatsSetupConst.areBasicStatsUptoDate(params)) { return allCols; } + // TODO: we should probably skip updating if writeId is from an active txn + if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery( + conf, db, tbl, params, statsWriteId, 0, writeIdString)) { + return allCols; + } List colsToUpdate = new ArrayList<>(); for (String col : allCols) { if (!StatsSetupConst.areColumnStatsUptoDate(params, col)) { @@ -460,8 +495,9 @@ private String buildPartColStr(Table table) { private ValidReaderWriteIdList getWriteIds( TableName fullTableName) throws NoSuchTxnException, MetaException { - GetValidWriteIdsRequest req = new GetValidWriteIdsRequest(); - req.setFullTableNames(Lists.newArrayList(fullTableName.toString())); + // TODO: acid utils don't support catalogs + GetValidWriteIdsRequest req = new GetValidWriteIdsRequest( + Lists.newArrayList(fullTableName.getDbTable()), null); return TxnUtils.createValidReaderWriteIdList( txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0)); } diff --git ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index 14f86eabbc..341b368f24 100644 --- ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -29,9 +29,11 @@ import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -141,6 +143,106 @@ public void testMultipleTables() throws Exception { } @Test(timeout=40000) + public void testTxnTable() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (s string) TBLPROPERTIES " + + "(\"transactional\"=\"true\", \"transactional_properties\"=\"insert_only\")"); + executeQuery("insert into simple_stats (s) values ('test')"); + ValidWriteIdList initialWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, initialWriteIds.toString(), true); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + + executeQuery("insert overwrite table simple_stats values ('test2')"); + ValidWriteIdList nextWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, nextWriteIds.toString(), true); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + String currentWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats").toString(); + + // Overwrite the txn state to refer to a previous txn. + Table tbl = msClient.getTable(ss.getCurrentDatabase(), "simple_stats"); + long badWriteId = tbl.getWriteId() + 1; + assertTrue(badWriteId + "", badWriteId > 0); + tbl.setWriteId(badWriteId); + EnvironmentContext envCtx = new EnvironmentContext(); + envCtx.putToProperties(StatsSetupConst.VALID_WRITE_IDS, initialWriteIds.toString()); + msClient.alter_table(ss.getCurrentDatabase(), "simple_stats", tbl); + + // Stats should not be valid. + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, currentWriteIds, false); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su); + + // TODO# This shouldn't actually work for now! Someone sets original writeId to 0. + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), + msClient, 0, currentWriteIds, true); + + msClient.close(); + } + + + @Test + public void testTxnPartitions() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (s string) partitioned by (p int) TBLPROPERTIES " + + "(\"transactional\"=\"true\", \"transactional_properties\"=\"insert_only\")"); + executeQuery("insert into simple_stats partition(p=1) values ('test')"); + executeQuery("insert into simple_stats partition(p=2) values ('test2')"); + executeQuery("insert into simple_stats partition(p=3) values ('test3')"); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + + executeQuery("insert overwrite table simple_stats partition(p=1) values ('test2')"); + executeQuery("insert overwrite table simple_stats partition(p=2) values ('test3')"); + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); + ValidWriteIdList currentWriteIds = msClient.getValidWriteIds( + ss.getCurrentDatabase() + ".simple_stats"); + + // Overwrite the txn state to refer to a previous txn on 2/3 partitions. + Partition part1 = msClient.getPartition(ss.getCurrentDatabase(), "simple_stats", "p=1"); + Partition part2 = msClient.getPartition(ss.getCurrentDatabase(), "simple_stats", "p=2"); + long badWriteId = msClient.getPartition( + ss.getCurrentDatabase(), "simple_stats", "p=2").getWriteId() + 1; + assertTrue(badWriteId + "", badWriteId > 1); + part1.setWriteId(badWriteId); + part2.setWriteId(badWriteId); + String cwIdStr = currentWriteIds.toString(); + // To update write ID we need to specify the write ID list to validate concurrent writes. + msClient.alter_partitions(ss.getCurrentDatabase(), "simple_stats", + Lists.newArrayList(part1), null, -1, cwIdStr, badWriteId); + msClient.alter_partitions(ss.getCurrentDatabase(), "simple_stats", + Lists.newArrayList(part2), null, -1, cwIdStr, badWriteId); + + // We expect two partitions to be updated. + Map> stats = msClient.getPartitionColumnStatistics( + ss.getCurrentDatabase(), "simple_stats", Lists.newArrayList("p=1", "p=2", "p=3"), + Lists.newArrayList("s"), 0, cwIdStr); + assertEquals(1, stats.size()); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su, 2); + stats = msClient.getPartitionColumnStatistics( + ss.getCurrentDatabase(), "simple_stats", Lists.newArrayList("p=1", "p=2", "p=3"), + Lists.newArrayList("s"), 0, cwIdStr); + assertEquals(3, stats.size()); + + msClient.close(); + } + + @Test(timeout=40000) public void testExistingOnly() throws Exception { hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing"); StatsUpdaterThread su = createUpdater(); @@ -443,6 +545,12 @@ private void verifyStatsUpToDate(String tbl, ArrayList cols, IMetaStoreC verifyStatsUpToDate(table.getParameters(), cols, isUpToDate); } + private void verifyStatsUpToDate(String tbl, ArrayList cols, IMetaStoreClient msClient, + long txnId, String validWriteIds, boolean isUpToDate) throws Exception { + Table table = msClient.getTable(ss.getCurrentDatabase(), tbl, txnId, validWriteIds); + verifyStatsUpToDate(table.getParameters(), cols, isUpToDate); + } + private void verifyStatsUpToDate(Map params, ArrayList cols, boolean isUpToDate) { if (isUpToDate) { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 746e6cd5de..2c644ad677 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -4805,6 +4805,7 @@ public void alter_partition_with_environment_context(final String dbName, final EnvironmentContext envContext) throws TException { String[] parsedDbName = parseDbName(dbName, conf); + // TODO: this method name is confusing, it actually does full alter (sortof) rename_partition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, null, newPartition, envContext); } @@ -5726,8 +5727,10 @@ public PartitionsStatsResult get_partitions_statistics_req(PartitionsStatsReques request.isSetTxnId() ? request.getTxnId() : -1, request.isSetValidWriteIdList() ? request.getValidWriteIdList() : null); Map> map = new HashMap<>(); - for (ColumnStatistics stat : stats) { - map.put(stat.getStatsDesc().getPartName(), stat.getStatsObj()); + if (stats != null) { + for (ColumnStatistics stat : stats) { + map.put(stat.getStatsDesc().getPartName(), stat.getStatsObj()); + } } result = new PartitionsStatsResult(map); } finally { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index cc417eab3b..bf8e121a99 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -1882,6 +1882,7 @@ public void alter_partition(String catName, String dbName, String tblName, Parti newPart, environmentContext); } + @Deprecated @Override public void alter_partitions(String dbName, String tblName, List newParts) throws TException { @@ -1901,8 +1902,6 @@ public void alter_partitions(String dbName, String tblName, List newP EnvironmentContext environmentContext, long txnId, String writeIdList, long writeId) throws InvalidOperationException, MetaException, TException { - //client.alter_partition_with_environment_context(getDefaultCatalog(conf), - // dbName, tblName, newParts, environmentContext); alter_partitions(getDefaultCatalog(conf), dbName, tblName, newParts, environmentContext, txnId, writeIdList, writeId); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 27d96e5f07..99c41b68dc 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -2109,6 +2109,7 @@ void alter_partition(String catName, String dbName, String tblName, Partition ne * @throws TException * if error in communicating with metastore server */ + @Deprecated void alter_partitions(String dbName, String tblName, List newParts) throws InvalidOperationException, MetaException, TException; @@ -2129,6 +2130,7 @@ void alter_partitions(String dbName, String tblName, List newParts) * @throws TException * if error in communicating with metastore server */ + @Deprecated void alter_partitions(String dbName, String tblName, List newParts, EnvironmentContext environmentContext) throws InvalidOperationException, MetaException, TException; @@ -2154,6 +2156,7 @@ void alter_partitions(String dbName, String tblName, List newParts, * @throws TException * if error in communicating with metastore server */ + @Deprecated default void alter_partitions(String catName, String dbName, String tblName, List newParts) throws InvalidOperationException, MetaException, TException { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index f45b71fb1f..ad7d30bd55 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -112,6 +112,7 @@ private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class); private final PersistenceManager pm; + private final Configuration conf; private final String schema; /** @@ -146,8 +147,10 @@ SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS; + public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { this.pm = pm; + this.conf = conf; this.schema = schema; DatabaseProduct dbType = null; try { @@ -645,8 +648,8 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\"," + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\"," - + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\" " - + "from " + PARTITIONS + "" + + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\", " + PARTITIONS + + ".\"WRITE_ID\"" + " from " + PARTITIONS + "" + " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" " + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc"; @@ -747,6 +750,10 @@ private boolean isViewTable(String catName, String dbName, String tblName) throw serde.setSerializationLib((String)fields[13]); serdeSb.append(serdeId).append(","); sd.setSerdeInfo(serde); + Long writeId = extractSqlLong(fields[14]); + if (writeId != null) { + part.setWriteId(writeId); + } Deadline.checkTimeout(); } query.closeAll(); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index aa29dd9113..ec41c348ba 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -1943,11 +1943,14 @@ private Table convertToTable(MTable mtbl) throws MetaException { t.setRewriteEnabled(mtbl.isRewriteEnabled()); t.setCatName(mtbl.getDatabase().getCatalogName()); + t.setWriteId(mtbl.getWriteId()); return t; } private MTable convertToMTable(Table tbl) throws InvalidObjectException, MetaException { + // NOTE: we don't set writeId in this method. Write ID is only set after validating the + // existing write ID against the caller's valid list. if (tbl == null) { return null; } @@ -1986,9 +1989,6 @@ private MTable convertToMTable(Table tbl) throws InvalidObjectException, convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(), tbl.getViewOriginalText(), tbl.getViewExpandedText(), tbl.isRewriteEnabled(), tableType); - if (TxnUtils.isTransactionalTable(tbl)) { - mtable.setWriteId(tbl.getWriteId()); - } return mtable; } @@ -2450,21 +2450,24 @@ public Partition getPartition(String catName, String dbName, String tableName, + part_vals.toString()); } part.setValues(part_vals); + setPartitionStatsParam(part, table.getParameters(), mpart.getWriteId(), txnId, writeIdList); + return part; + } + + private void setPartitionStatsParam(Partition part, Map tableParams, + long partWriteId, long reqTxnId, String reqWriteIdList) throws MetaException { // If transactional table partition, check whether the current version partition // statistics in the metastore comply with the client query's snapshot isolation. - if (writeIdList != null) { - if (TxnUtils.isTransactionalTable(table.getParameters())) { - if (isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList)) { - part.setIsStatsCompliant(true); - } else { - part.setIsStatsCompliant(false); - // Do not make persistent the following state since it is query specific (not global). - StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); - } - } + if (reqWriteIdList == null) return; + if (!TxnUtils.isTransactionalTable(tableParams)) return; + if (isCurrentStatsValidForTheQuery(part, partWriteId, reqTxnId, reqWriteIdList)) { + part.setIsStatsCompliant(true); + } else { + part.setIsStatsCompliant(false); + // Do not make persistent the following state since it is query specific (not global). + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); } - return part; } /** @@ -2570,6 +2573,8 @@ private MPartition getMPartition(String catName, String dbName, String tableName */ private MPartition convertToMPart(Partition part, MTable mt, boolean useTableCD) throws InvalidObjectException, MetaException { + // NOTE: we don't set writeId in this method. Write ID is only set after validating the + // existing write ID against the caller's valid list. if (part == null) { return null; } @@ -2597,9 +2602,6 @@ private MPartition convertToMPart(Partition part, MTable mt, boolean useTableCD) .getPartitionKeys()), part.getValues()), mt, part.getValues(), part .getCreateTime(), part.getLastAccessTime(), msd, part.getParameters()); - if (TxnUtils.isTransactionalTable(mt.getParameters())) { - mpart.setWriteId(part.getWriteId()); - } return mpart; } @@ -2612,6 +2614,7 @@ private Partition convertToPart(MPartition mpart) throws MetaException { mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd()), convertMap(mpart.getParameters())); p.setCatName(mpart.getTable().getDatabase().getCatalogName()); + p.setWriteId(mpart.getWriteId()); return p; } @@ -2624,6 +2627,7 @@ private Partition convertToPart(String catName, String dbName, String tblName, M mpart.getCreateTime(), mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd(), false), convertMap(mpart.getParameters())); p.setCatName(catName); + p.setWriteId(mpart.getWriteId()); return p; } @@ -4142,6 +4146,7 @@ public void alterTable(String catName, String dbname, String name, Table newTabl LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + dbname + "." + name + ". will be made persistent."); } + LOG.error("TODO# setting ID from " + oldt.getWriteId() + " to " + newTable.getWriteId(), new Exception()); oldt.setWriteId(newTable.getWriteId()); } @@ -4239,7 +4244,7 @@ private MColumnDescriptor alterPartitionNoTxn(String catName, String dbname, Str public void alterPartition(String catName, String dbname, String name, List part_vals, Partition newPart, long queryTxnId, String queryValidWriteIds) throws InvalidObjectException, MetaException { boolean success = false; - Exception e = null; + Throwable e = null; try { openTransaction(); if (newPart.isSetWriteId()) { @@ -4250,7 +4255,8 @@ public void alterPartition(String catName, String dbname, String name, List newPartNames = partNames; // Optimistically assume the list will stay as is. + for (int i = 0; i < partNames.size(); ++i) { + String partName = partNames.get(i); + MPartition mpart = getMPartition( + catName, dbName, tableName, Warehouse.getPartValuesFromPartName(partName)); + if (mpart == null + || !isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList)) { + if (mpart != null) { + LOG.debug("The current metastore transactional partition column statistics for {}.{}.{} " + + "(write ID {}) are not valid for current query ({} {})", dbName, tableName, + mpart.getPartitionName(), mpart.getWriteId(), txnId, writeIdList); + } + if (partNames == newPartNames) { + // Create a new list if we need to make changes. + newPartNames = new ArrayList(partNames.size()); + newPartNames.addAll(partNames.subList(0, i)); + } + } else if (partNames != newPartNames) { + newPartNames.add(partName); } } + partNames = newPartNames; } return getPartitionColumnStatisticsInternal( catName, dbName, tableName, partNames, colNames, true, true); @@ -12307,7 +12327,7 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { */ private boolean isCurrentStatsValidForTheQuery( MTable tbl, long queryTxnId, String queryValidWriteIdList) throws MetaException { - return isCurrentStatsValidForTheQuery(tbl.getDatabase().getName(), tbl.getTableName(), + return isCurrentStatsValidForTheQuery(conf, tbl.getDatabase().getName(), tbl.getTableName(), tbl.getParameters(), tbl.getWriteId(), queryTxnId, queryValidWriteIdList); } @@ -12328,13 +12348,21 @@ private boolean isCurrentStatsValidForTheQuery( private boolean isCurrentStatsValidForTheQuery( MPartition part, long queryTxnId, String queryValidWriteIdList) throws MetaException { - return isCurrentStatsValidForTheQuery(part.getTable().getDatabase().getName(), + return isCurrentStatsValidForTheQuery(conf, part.getTable().getDatabase().getName(), part.getTable().getTableName(), part.getParameters(), part.getWriteId(), queryTxnId, queryValidWriteIdList); } - private boolean isCurrentStatsValidForTheQuery(String dbName, String tblName, - Map statsParams, long statsWriteId, long queryTxnId, + private boolean isCurrentStatsValidForTheQuery( + Partition part, long partWriteId, long queryTxnId, String queryValidWriteIdList) + throws MetaException { + return isCurrentStatsValidForTheQuery(conf, part.getDbName(), part.getTableName(), + part.getParameters(), partWriteId, queryTxnId, queryValidWriteIdList); + } + + // TODO: move to somewhere else + public static boolean isCurrentStatsValidForTheQuery(Configuration conf, String dbName, + String tblName, Map statsParams, long statsWriteId, long queryTxnId, String queryValidWriteIdList) throws MetaException { // Note: can be changed to debug/info to verify the calls. @@ -12367,6 +12395,10 @@ private boolean isCurrentStatsValidForTheQuery(String dbName, String tblName, return true; } + if (queryTxnId < 1) { + return false; // The caller is outside of a txn; no need to check the same-txn case. + } + // This assumes that all writes within the same txn are sequential and can see each other. // TODO## Not clear if we need this check; each next write should have the previous // one in its writeIdList; verify w/Eugene.