diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 0cc0ae5771..ff97522e63 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -22,11 +22,13 @@ import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor; import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.FileMetadataHandler; @@ -88,6 +90,7 @@ import org.apache.hadoop.hive.metastore.api.WMNullablePool; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.thrift.TException; /** @@ -1221,4 +1224,22 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { return objectStore.deleteRuntimeStats(maxRetainSecs); } -} + + @Override + public List getTableNamesWithStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public List getAllTableNamesForStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, + NoSuchObjectException { + return null; + }} diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java new file mode 100644 index 0000000000..8228109751 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DriverUtils { + private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class); + + public static void runOnDriver(HiveConf conf, String user, + SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException { + SessionState.setCurrentSessionState(sessionState); + boolean isOk = false; + try { + QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build(); + Driver driver = new Driver(qs, user, null, null); + driver.setCompactionWriteIds(writeIds); + try { + CommandProcessorResponse cpr = driver.run(query); + if (cpr.getResponseCode() != 0) { + LOG.error("Failed to run " + query, cpr.getException()); + throw new HiveException("Failed to run " + query, cpr.getException()); + } + } finally { + driver.close(); + driver.destroy(); + } + isOk = true; + } finally { + if (!isOk) { + try { + sessionState.close(); // This also resets SessionState.get. + } catch (Throwable th) { + LOG.warn("Failed to close a bad session", th); + SessionState.detachSession(); + } + } + } + } + + public static SessionState setUpSessionState(HiveConf conf, String user, boolean doStart) { + SessionState sessionState = SessionState.get(); + if (sessionState == null) { + // Note: we assume that workers run on the same threads repeatedly, so we can set up + // the session here and it will be reused without explicitly storing in the worker. + sessionState = new SessionState(conf, user); + if (doStart) { + // TODO: Required due to SessionState.getHDFSSessionPath. Why wasn't it required before? + sessionState.setIsHiveServerQuery(true); + SessionState.start(sessionState); + } + SessionState.setCurrentSessionState(sessionState); + } + return sessionState; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6c56212c9e..83490d2d53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -818,7 +818,7 @@ public static Path getHDFSSessionPath(Configuration conf) { return new Path(sessionPathString); } Preconditions.checkNotNull(ss.hdfsSessionPath, - "Non-local session path expected to be non-null"); + "Non-local session path expected to be non-null"); return ss.hdfsSessionPath; } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java new file mode 100644 index 0000000000..285db31fd4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.stats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +public class StatsUpdaterThread extends Thread implements MetaStoreThread { + public static final String SKIP_STATS_AUTOUPDATE_PROPERTY = "skip.stats.autoupdate"; + private static final Logger LOG = LoggerFactory.getLogger(StatsUpdaterThread.class); + + protected Configuration conf; + protected int threadId; + protected AtomicBoolean stop; + protected AtomicBoolean looped; + + private RawStore rs; + private TxnStore txnHandler; + /** Full tables, and partitions that currently have analyze commands queued or in progress. */ + private ConcurrentHashMap tablesInProgress = new ConcurrentHashMap<>(); + private ConcurrentHashMap partsInProgress = new ConcurrentHashMap<>(); + private AtomicInteger itemsInProgress = new AtomicInteger(0); + + // Configuration + /** Whether to only update stats that already exist and are out of date. */ + private boolean isExistingOnly; + private long noUpdatesWaitMs; + private int batchSize; + + // Worker threads stuff + private BlockingQueue workQueue; + private Thread[] workers; + + @Override + public void setConf(Configuration conf) { + StatsUpdateMode mode = StatsUpdateMode.valueOf( + MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase()); + switch (mode) { + case ALL: this.isExistingOnly = false; break; + case EXISTING: this.isExistingOnly = true; break; + default: throw new AssertionError("Unexpected mode " + mode); + } + noUpdatesWaitMs = MetastoreConf.getTimeVar( + conf, ConfVars.STATS_AUTO_UPDATE_NOOP_WAIT, TimeUnit.MILLISECONDS); + batchSize = MetastoreConf.getIntVar(conf, ConfVars.BATCH_RETRIEVE_MAX); + int workerCount = MetastoreConf.getIntVar(conf, ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT); + if (workerCount <= 0) { + workerCount = 1; + } + workers = new Thread[workerCount]; + // Don't store too many items; if the queue is full we'll block the checker thread. + // Since the worker count determines how many queries can be running in parallel, it makes + // no sense to produce more work if the backlog is getting too long. + workQueue = new ArrayBlockingQueue(workerCount * 3); + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + } + + @Override + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + this.stop = stop; + this.looped = looped; + setPriority(MIN_PRIORITY); + setDaemon(true); + String user = "anonymous"; + try { + user = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + LOG.warn("Cannot determine the current user; executing as anonymous", e); + } + txnHandler = TxnUtils.getTxnStore(conf); + rs = RawStoreProxy.getProxy(conf, conf, + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); + for (int i = 0; i < workers.length; ++i) { + workers[i] = new Thread(new WorkerRunnable(conf, user)); + workers[i].setDaemon(true); + workers[i].setName("Stats updater worker " + i); + } + } + + @Override + public void run() { + startWorkers(); + while (!stop.get()) { + boolean hadUpdates = runOneIteration(); + try { + Thread.sleep(hadUpdates ? 0 : noUpdatesWaitMs); + } catch (InterruptedException e) { + LOG.info("Stats updater thread was interrupted and will now exit"); + stopWorkers(); + return; + } + } + LOG.info("Stats updater thread was stopped and will now exit"); + } + + @VisibleForTesting + void startWorkers() { + for (int i = 0; i < workers.length; ++i) { + workers[i].start(); + } + } + + @VisibleForTesting + boolean runOneIteration() { + List fullTableNames; + try { + fullTableNames = getTablesToCheck(); + } catch (Throwable t) { + LOG.error("Stats updater thread cannot retrieve tables and will now exit", t); + stopWorkers(); + throw new RuntimeException(t); + } + LOG.debug("Processing {}", fullTableNames); + boolean hadUpdates = false; + for (FullTableName fullTableName : fullTableNames) { + try { + List commands = processOneTable(fullTableName); + hadUpdates = hadUpdates || commands != null; + if (commands != null) { + for (AnalyzeWork req : commands) { + markAnalyzeInProgress(req); + workQueue.put(req); + } + } + } catch (Exception e) { + LOG.error("Failed to process " + fullTableName + "; skipping for now", e); + } + } + return hadUpdates; + } + + private void stopWorkers() { + for (int i = 0; i < workers.length; ++i) { + workers[i].interrupt(); + } + } + + private List processOneTable(FullTableName fullTableName) + throws MetaException, NoSuchTxnException, NoSuchObjectException { + if (isAnalyzeTableInProgress(fullTableName)) return null; + String cat = fullTableName.catalog, db = fullTableName.db, tbl = fullTableName.table; + Table table = rs.getTable(cat, db, tbl); + LOG.debug("Processing table {}", table); + + // Check if the table should be skipped. + String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY); + if ("true".equalsIgnoreCase(skipParam)) return null; + + // TODO: when txn stats are implemented, use writeIds to determine stats accuracy + @SuppressWarnings("unused") + ValidReaderWriteIdList writeIds = null; + if (AcidUtils.isTransactionalTable(table)) { + writeIds = getWriteIds(fullTableName); + } + List allCols = new ArrayList<>(table.getSd().getColsSize()); + for (FieldSchema fs : table.getSd().getCols()) { + allCols.add(fs.getName()); + } + Collections.sort(allCols); + if (table.getPartitionKeysSize() == 0) { + Map params = table.getParameters(); + List colsToUpdate = isExistingOnly + ? getExistingNonPartTableStatsToUpdate(fullTableName, cat, db, tbl, params, allCols) + : getAnyStatsToUpdate(allCols, params); + LOG.debug("Columns to update are {}; existing only: {}, out of: {} based on {}", + colsToUpdate, isExistingOnly, allCols, params); + + if (colsToUpdate == null || colsToUpdate.isEmpty()) { + return null; // No update necessary. + } + return Lists.newArrayList(new AnalyzeWork(fullTableName, + null, null, allCols.size() == colsToUpdate.size() ? null : colsToUpdate)); + } else { + Map> partsToAnalyze = new HashMap<>(); + List colsForAllParts = findPartitionsToAnalyze( + fullTableName, cat, db, tbl, allCols, partsToAnalyze); + LOG.debug("Columns to update are {} for all partitions; {} individual partitions." + + " Existing only: {}, out of: {}", colsForAllParts, partsToAnalyze.size(), + isExistingOnly, allCols); + if (colsForAllParts == null && partsToAnalyze.isEmpty()) { + return null; // No partitions need update. + } + if (colsForAllParts != null) { + // We can update all partitions with a single analyze query. + return Lists.newArrayList(new AnalyzeWork( + fullTableName, null, buildPartColStr(table), colsForAllParts)); + } + List result = new ArrayList<>(partsToAnalyze.size()); + for (Map.Entry> e : partsToAnalyze.entrySet()) { + LOG.debug("Adding analyze work for {}", e.getKey()); + result.add(new AnalyzeWork(fullTableName, e.getKey(), null, e.getValue())); + } + return result; + } + } + + private List findPartitionsToAnalyze(FullTableName fullTableName, String cat, String db, + String tbl, List allCols, Map> partsToAnalyze) + throws MetaException, NoSuchObjectException { + // TODO: ideally when col-stats-accurate stuff is stored in some sane structure, this should + // to retrieve partsToUpdate in a single query; no checking partition params in java. + List partNames = null; + Map> colsPerPartition = null; + boolean isAllParts = true; + if (isExistingOnly) { + colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl); + partNames = Lists.newArrayList(colsPerPartition.keySet()); + int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, ""); + isAllParts = partitionCount == partNames.size(); + } else { + partNames = rs.listPartitionNames(cat, db, tbl, (short) -1); + isAllParts = true; + } + Table t = rs.getTable(cat, db, tbl); + List currentBatch = null; + int nextBatchStart = 0, nextIxInBatch = -1, currentBatchStart = 0; + List colsToUpdateForAll = null; + while (true) { + if (currentBatch == null || nextIxInBatch == currentBatch.size()) { + if (nextBatchStart >= partNames.size()) { + break; + } + int nextBatchEnd = Math.min(partNames.size(), nextBatchStart + this.batchSize); + List currentNames = partNames.subList(nextBatchStart, nextBatchEnd); + currentBatchStart = nextBatchStart; + nextBatchStart = nextBatchEnd; + try { + currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames); + } catch (NoSuchObjectException e) { + LOG.error("Failed to get partitions for " + fullTableName + ", skipping some partitions", e); + currentBatch = null; + continue; + } + nextIxInBatch = 0; + } + int currentIxInBatch = nextIxInBatch++; + Partition part = currentBatch.get(currentIxInBatch); + String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + LOG.debug("Processing partition ({} in batch), {}", currentIxInBatch, partName); + + // Skip the partitions in progress, and the ones for which stats update is disabled. + // We could filter the skipped partititons out as part of the initial names query, + // but we assume it's extremely rare for individual partitions. + Map params = part.getParameters(); + String skipParam = params.get(SKIP_STATS_AUTOUPDATE_PROPERTY); + if (isAnalyzePartInProgress(fullTableName, partName) || "true".equalsIgnoreCase(skipParam)) { + if (isAllParts) { + addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch, + colsToUpdateForAll, partsToAnalyze); + } + isAllParts = false; + continue; + } + + // Find which columns we need to update for this partition, if any. + List colsToMaybeUpdate = allCols; + if (isExistingOnly) { + colsToMaybeUpdate = colsPerPartition.get(partName); + Collections.sort(colsToMaybeUpdate); + } + List colsToUpdate = getAnyStatsToUpdate(colsToMaybeUpdate, params); + LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params); + + + if (colsToUpdate == null || colsToUpdate.isEmpty()) { + if (isAllParts) { + addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch, + colsToUpdateForAll, partsToAnalyze); + } + isAllParts = false; + continue; + } + + // If issuing a query for all partitions, verify that we need update the same columns. + // TODO: for non columnar we don't need to do this... might as well update all stats. + if (isAllParts) { + List newCols = verifySameColumnsForAllParts(colsToUpdateForAll, colsToUpdate); + if (newCols == null) { + isAllParts = false; + addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch, + colsToUpdateForAll, partsToAnalyze); + } else if (colsToUpdateForAll == null) { + colsToUpdateForAll = newCols; + } + } + + if (!isAllParts) { + LOG.trace("Adding {}, {}", partName, colsToUpdate); + partsToAnalyze.put(partName, colsToUpdate); + } + } + return isAllParts ? colsToUpdateForAll : null; + } + + private List verifySameColumnsForAllParts( + List colsToUpdateForAll, List colsToUpdate) { + if (colsToUpdateForAll == null) { + return colsToUpdate; + } + if (colsToUpdate.size() != colsToUpdateForAll.size()) { + return null; + } + // Assumes the lists are sorted. + for (int i = 0; i < colsToUpdateForAll.size(); ++i) { + if (!colsToUpdate.get(i).equals(colsToUpdateForAll.get(i))) { + return null; + } + } + return colsToUpdateForAll; + } + + private void addPreviousPartitions(Table t, List allPartNames, + int currentBatchStart, List currentBatch, int currentIxInBatch, + List cols, Map> partsToAnalyze) throws MetaException { + // Add all the names for previous batches. + for (int i = 0; i < currentBatchStart; ++i) { + LOG.trace("Adding previous {}, {}", allPartNames.get(i), cols); + partsToAnalyze.put(allPartNames.get(i), cols); + } + // Current match may be out of order w.r.t. the global name list, so add specific parts. + for (int i = 0; i < currentIxInBatch; ++i) { + String name = Warehouse.makePartName(t.getPartitionKeys(), currentBatch.get(i).getValues()); + LOG.trace("Adding previous {}, {}", name, cols); + partsToAnalyze.put(name, cols); + } + } + + private String buildPartColStr(Table table) { + String partColStr = ""; + for (int i = 0; i < table.getPartitionKeysSize(); ++i) { + if (i != 0) { + partColStr += ","; + } + partColStr += table.getPartitionKeys().get(i).getName(); + } + return partColStr; + } + + private List getExistingNonPartTableStatsToUpdate(FullTableName fullTableName, + String cat, String db, String tbl, Map params, + List allCols) throws MetaException { + ColumnStatistics existingStats = null; + try { + existingStats = rs.getTableColumnStatistics(cat, db, tbl, allCols); + } catch (NoSuchObjectException e) { + LOG.error("Cannot retrieve existing stats, skipping " + fullTableName, e); + return null; + } + return getExistingStatsToUpdate(existingStats, params); + } + + private List getExistingStatsToUpdate( + ColumnStatistics existingStats, Map params) { + boolean hasAnyAccurate = StatsSetupConst.areBasicStatsUptoDate(params); + List colsToUpdate = new ArrayList<>(); + for (ColumnStatisticsObj obj : existingStats.getStatsObj()) { + String col = obj.getColName(); + if (!hasAnyAccurate || !StatsSetupConst.areColumnStatsUptoDate(params, col)) { + colsToUpdate.add(col); + } + } + return colsToUpdate; + } + + private List getAnyStatsToUpdate( + List allCols, Map params) { + // Note: we only run "for columns" command and assume no basic stats means no col stats. + if (!StatsSetupConst.areBasicStatsUptoDate(params)) { + return allCols; + } + List colsToUpdate = new ArrayList<>(); + for (String col : allCols) { + if (!StatsSetupConst.areColumnStatsUptoDate(params, col)) { + colsToUpdate.add(col); + } + } + return colsToUpdate; + } + + private List getTablesToCheck() throws MetaException, NoSuchObjectException { + if (isExistingOnly) { + try { + return rs.getTableNamesWithStats(); + } catch (Exception ex) { + LOG.error("Error from getTablesWithStats, getting all the tables", ex); + } + } + return rs.getAllTableNamesForStats(); + } + + private ValidReaderWriteIdList getWriteIds( + FullTableName fullTableName) throws NoSuchTxnException, MetaException { + GetValidWriteIdsRequest req = new GetValidWriteIdsRequest(); + req.setFullTableNames(Lists.newArrayList(fullTableName.toString())); + return TxnUtils.createValidReaderWriteIdList( + txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0)); + } + + + private void markAnalyzeInProgress(AnalyzeWork req) { + if (req.partName == null) { + Boolean old = tablesInProgress.putIfAbsent(req.tableName, true); + if (old != null) { + throw new AssertionError("The table was added to progress twice: " + req.tableName); + } + } else { + String partName = req.makeFullPartName(); + Boolean old = partsInProgress.putIfAbsent(partName, true); + if (old != null) { + throw new AssertionError("The partition was added to progress twice: " + partName); + } + } + itemsInProgress.incrementAndGet(); + } + + private void markAnalyzeDone(AnalyzeWork req) { + if (req.partName == null) { + Boolean old = tablesInProgress.remove(req.tableName); + if (old == null) { + throw new AssertionError("The table was not in progress: " + req.tableName); + } + } else { + String partName = req.makeFullPartName(); + Boolean old = partsInProgress.remove(partName); + if (old == null) { + throw new AssertionError("Partition was not in progress: " + partName); + } + } + // This is used for tests where there's always just one batch of work and we do the + // checks after the batch, so the check will only come at the end of queueing. + int remaining = itemsInProgress.decrementAndGet(); + if (remaining == 0) { + synchronized (itemsInProgress) { + itemsInProgress.notifyAll(); + } + } + } + + private boolean isAnalyzeTableInProgress(FullTableName fullTableName) { + return tablesInProgress.containsKey(fullTableName); + } + + private boolean isAnalyzePartInProgress(FullTableName tableName, String partName) { + return partsInProgress.containsKey(makeFullPartName(tableName, partName)); + } + + private static String makeFullPartName(FullTableName tableName, String partName) { + return tableName + "/" + partName; + } + + private final static class AnalyzeWork { + FullTableName tableName; + String partName, allParts; + List cols; + + public AnalyzeWork(FullTableName tableName, String partName, String allParts, List cols) { + this.tableName = tableName; + this.partName = partName; + this.allParts = allParts; + this.cols = cols; + } + + public String makeFullPartName() { + return StatsUpdaterThread.makeFullPartName(tableName, partName); + } + + public String buildCommand() { + // Catalogs cannot be parsed as part of the query. Seems to be a bug. + String cmd = "analyze table " + tableName.db + "." + tableName.table; + assert partName == null || allParts == null; + if (partName != null) { + cmd += " partition(" + partName + ")"; + } + if (allParts != null) { + cmd += " partition(" + allParts + ")"; + } + cmd += " compute statistics for columns"; + if (cols != null) { + cmd += " " + String.join(",", cols); + } + return cmd; + } + + @Override + public String toString() { + return "AnalyzeWork [tableName=" + tableName + ", partName=" + partName + + ", allParts=" + allParts + ", cols=" + cols + "]"; + } + } + + @VisibleForTesting + public boolean runOneWorkerIteration( + SessionState ss, String user, HiveConf conf, boolean doWait) throws InterruptedException { + AnalyzeWork req; + if (doWait) { + req = workQueue.take(); + } else { + req = workQueue.poll(); + if (req == null) { + return false; + } + } + String cmd = null; + try { + cmd = req.buildCommand(); + LOG.debug("Running {} based on {}", cmd, req); + if (doWait) { + SessionState.start(ss); // This is the first call, open the session + } + DriverUtils.runOnDriver(conf, user, ss, cmd, null); + } catch (Exception e) { + LOG.error("Analyze command failed: " + cmd, e); + try { + ss.close(); + } catch (IOException e1) { + LOG.warn("Failed to close a bad session", e1); + } finally { + SessionState.detachSession(); + } + } finally { + markAnalyzeDone(req); + } + return true; + } + + public class WorkerRunnable implements Runnable { + private final HiveConf conf; + private final String user; + + public WorkerRunnable(Configuration conf, String user) { + this.conf = new HiveConf(conf, HiveConf.class); + this.user = user; + } + + @Override + public void run() { + while (true) { + // This should not start the actual Tez AM. + SessionState ss = DriverUtils.setUpSessionState(conf, user, false); + // Wait for the first item to arrive at the queue and process it. + try { + runOneWorkerIteration(ss, user, conf, true); + } catch (InterruptedException e) { + closeSession(ss); + LOG.info("Worker thread was interrupted and will now exit"); + return; + } + // Keep draining the queue in the same session. + try { + while (runOneWorkerIteration(ss, user, conf, false)) {} + } catch (InterruptedException e) { + closeSession(ss); + LOG.info("Worker thread was interrupted unexpectedly and will now exit"); + return; + }; + // Close the session before we have to wait again. + closeSession(ss); + SessionState.detachSession(); + } + } + } + + private static void closeSession(SessionState ss) { + try { + ss.close(); + } catch (IOException e1) { + LOG.error("Failed to close the session", e1); + } + } + + @VisibleForTesting + public void waitForQueuedCommands() throws InterruptedException { + while (itemsInProgress.get() > 0) { + synchronized (itemsInProgress) { + itemsInProgress.wait(100L); + } + } + } + + @VisibleForTesting + public int getQueueLength() { + return workQueue.size(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 982b180761..60447192b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -353,7 +354,7 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); String user = UserGroupInformation.getCurrentUser().getShortUserName(); - SessionState sessionState = setUpSessionState(conf, user); + SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false); // Note: we could skip creating the table and just add table type stuff directly to the // "insert overwrite directory" command if there were no bucketing or list bucketing. @@ -365,7 +366,7 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, p == null ? t.getSd() : p.getSd(), baseLocation.toString()); LOG.info("Compacting a MM table into " + query); try { - runOnDriver(conf, user, sessionState, query, null); + DriverUtils.runOnDriver(conf, user, sessionState, query, null); break; } catch (Exception ex) { Throwable cause = ex; @@ -380,26 +381,16 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, String query = buildMmCompactionQuery(conf, t, p, tmpTableName); LOG.info("Compacting a MM table via " + query); - runOnDriver(conf, user, sessionState, query, writeIds); + DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds); commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds); - runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null); + DriverUtils.runOnDriver(conf, user, sessionState, + "drop table if exists " + tmpTableName, null); } catch (HiveException e) { LOG.error("Error compacting a MM table", e); throw new IOException(e); } } - public SessionState setUpSessionState(HiveConf conf, String user) { - SessionState sessionState = SessionState.get(); - if (sessionState == null) { - // Note: we assume that workers run on the same threads repeatedly, so we can set up - // the session here and it will be reused without explicitly storing in the worker. - sessionState = new SessionState(conf, user); - SessionState.setCurrentSessionState(sessionState); - } - return sessionState; - } - private String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } @@ -514,36 +505,6 @@ private String buildMmCompactionCtQuery( return result; } - private void runOnDriver(HiveConf conf, String user, - SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException { - boolean isOk = false; - try { - QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build(); - Driver driver = new Driver(qs, user, null, null); - driver.setCompactionWriteIds(writeIds); - try { - CommandProcessorResponse cpr = driver.run(query); - if (cpr.getResponseCode() != 0) { - LOG.error("Failed to run " + query, cpr.getException()); - throw new HiveException("Failed to run " + query, cpr.getException()); - } - } finally { - driver.close(); - driver.destroy(); - } - isOk = true; - } finally { - if (!isOk) { - try { - sessionState.close(); // This also resets SessionState.get. - } catch (Throwable th) { - LOG.warn("Failed to close a bad session", th); - SessionState.detachSession(); - } - } - } - } - private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { String fullName = t.getDbName() + "." + t.getTableName(); // TODO: ideally we should make a special form of insert overwrite so that we: diff --git ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java new file mode 100644 index 0000000000..d0b41f3991 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestStatsUpdaterThread { + @SuppressWarnings("unused") + static final private Logger LOG = LoggerFactory.getLogger(TestStatsUpdaterThread.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestStatsUpdaterThread.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + private HiveConf hiveConf; + private SessionState ss; + + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + this.hiveConf = new HiveConf(TestStatsUpdaterThread.class); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getTestDataDir()); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); +// hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true); + hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "all"); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.prepDb(hiveConf); + File f = new File(getTestDataDir()); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(getTestDataDir()).mkdirs())) { + throw new RuntimeException("Could not create " + getTestDataDir()); + } + this.ss = DriverUtils.setUpSessionState(hiveConf, "hive", true); + cleanUp(); + } + + @After + public void cleanUp() throws HiveException { + executeQuery("drop table simple_stats"); + executeQuery("drop table simple_stats2"); + executeQuery("drop table simple_stats3"); + } + + @Test(timeout=20000) + public void testSimpleUpdateWithThreads() throws Exception { + StatsUpdaterThread su = createUpdater(); + su.startWorkers(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (i int, s string)"); + executeQuery("insert into simple_stats (i, s) values (1, 'test')"); + verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient); + + assertTrue(su.runOneIteration()); + su.waitForQueuedCommands(); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true); + + msClient.close(); + } + + @Test(timeout=20000) + public void testMultipleTables() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (s string)"); + executeQuery("insert into simple_stats (s) values ('test')"); + executeQuery("create table simple_stats2 (s string)"); + executeQuery("insert into simple_stats2 (s) values ('test2')"); + verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient); + verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su); + verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient); + verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient); + + setTableSkipProperty(msClient, "simple_stats", "true"); + assertTrue(su.runOneIteration()); + drainWorkQueue(su); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, false); + verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient); + + msClient.close(); + } + + @Test(timeout=20000) + public void testExistingOnly() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing"); + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + + executeQuery("create table simple_stats (i int, s string)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("insert into simple_stats (i, s) values (1, 'test')"); + executeQuery("analyze table simple_stats compute statistics for columns i"); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false); + verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true); + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false); + + msClient.close(); + } + + @Test(timeout=60000) + public void testQueueingWithThreads() throws Exception { + final int PART_COUNT = 12; + hiveConf.setInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 5); + hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 2); + StatsUpdaterThread su = createUpdater(); + su.startWorkers(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string) partitioned by (i int)"); + for (int i = 0; i < PART_COUNT; ++i) { + executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')"); + } + verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false); + + // Set one of the partitions to be skipped, so that a command is created for every other one. + setPartitionSkipProperty(msClient, "simple_stats", "i=0", "true"); + + + assertTrue(su.runOneIteration()); + su.waitForQueuedCommands(); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, false); + verifyPartStatsUpToDate(PART_COUNT, 1, msClient, "simple_stats", true); + + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); // Nothing else is updated after the first update. + + msClient.close(); + } + + @Test(timeout=20000) + public void testAllPartitions() throws Exception { + final int PART_COUNT = 3; + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string) partitioned by (i int)"); + for (int i = 0; i < PART_COUNT; ++i) { + executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')"); + } + verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su, 1); // All the partitions need to be updated; a single command can be used. + verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", true); + + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); // Nothing else is updated after the first update. + + msClient.close(); + } + + @Test(timeout=20000) + public void testPartitionSubset() throws Exception { + final int NONSTAT_PART_COUNT = 3; + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string) partitioned by (i int)"); + for (int i = 0; i < NONSTAT_PART_COUNT; ++i) { + executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')"); + } + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true); + executeQuery("insert into simple_stats partition(i='" + + NONSTAT_PART_COUNT + "') values ('test')"); + verifyPartStatsUpToDate(NONSTAT_PART_COUNT, 0, msClient, "simple_stats", false); + verifyStatsUpToDate("simple_stats", + "i=" + NONSTAT_PART_COUNT, Lists.newArrayList("s"), msClient, true); + + final int EXCLUDED_PART = 1; + setPartitionSkipProperty(msClient, "simple_stats", "i=" + EXCLUDED_PART, "true"); + + assertTrue(su.runOneIteration()); + // 1 is excluded via property, 3 already has stats, so we only expect two updates. + drainWorkQueue(su, NONSTAT_PART_COUNT - 1); + for (int i = 0; i < NONSTAT_PART_COUNT; ++i) { + verifyStatsUpToDate("simple_stats", + "i=" + i, Lists.newArrayList("s"), msClient, i != EXCLUDED_PART); + } + verifyStatsUpToDate("simple_stats", "i=" + EXCLUDED_PART, + Lists.newArrayList("s"), msClient, false); + + msClient.close(); + } + + @Test(timeout=20000) + public void testPartitionsWithDifferentColsAll() throws Exception { + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)"); + executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')"); + executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')"); + executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s"); + executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns s, u"); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("t", "u"), msClient, false); + verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "u"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("t"), msClient, false); + + assertTrue(su.runOneIteration()); + // Different columns means different commands have to be run. + drainWorkQueue(su, 2); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t", "u"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true); + + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); // Nothing else is updated after the first update. + + msClient.close(); + } + + + @Test(timeout=20000) + public void testPartitionsWithDifferentColsExistingOnly() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing"); + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)"); + executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')"); + executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')"); + executeQuery("insert into simple_stats partition(i=2) values ('test', '2', 'baz')"); + executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s, t"); + executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns"); + executeQuery("analyze table simple_stats partition(i=2) compute statistics for columns s"); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false); + verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false); + + // We will unset s on i=0, and t on i=1. Only these should be updated; and nothing for 2. + verifyAndUnsetColStats("simple_stats", "i=0", Lists.newArrayList("s"), msClient); + verifyAndUnsetColStats("simple_stats", "i=1", Lists.newArrayList("t"), msClient); + + assertTrue(su.runOneIteration()); + drainWorkQueue(su, 2); + // Exact same state as above. + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false); + verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true); + verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false); + + msClient.close(); + } + + @Test(timeout=20000) + public void testParallelOps() throws Exception { + // Set high worker count so we get a longer queue. + hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 4); + StatsUpdaterThread su = createUpdater(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("create table simple_stats (s string)"); + executeQuery("create table simple_stats2 (s string) partitioned by (i int)"); + executeQuery("create table simple_stats3 (s string) partitioned by (i int)"); + executeQuery("insert into simple_stats values ('test')"); + executeQuery("insert into simple_stats2 partition(i=0) values ('test')"); + executeQuery("insert into simple_stats3 partition(i=0) values ('test')"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true); + executeQuery("insert into simple_stats3 partition(i=1) values ('test')"); + + assertTrue(su.runOneIteration()); + assertEquals(3, su.getQueueLength()); + // Nothing updated yet. + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false); + verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", false); + verifyStatsUpToDate("simple_stats3", "i=0", Lists.newArrayList("s"), msClient, false); + verifyStatsUpToDate("simple_stats3", "i=1", Lists.newArrayList("s"), msClient, true); + + assertFalse(su.runOneIteration()); + assertEquals(3, su.getQueueLength()); // Nothing new added to the queue while analyze runs. + + // Add another partition without stats. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + executeQuery("insert into simple_stats3 partition(i=2) values ('test')"); + + assertTrue(su.runOneIteration()); + assertEquals(4, su.getQueueLength()); // An item for new partition is queued now. + + drainWorkQueue(su, 4); + + verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, true); + verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", true); + verifyPartStatsUpToDate(3, 0, msClient, "simple_stats3", true); + + assertFalse(su.runOneIteration()); + drainWorkQueue(su, 0); // Nothing else is updated after the first update. + + msClient.close(); + } + + private void verifyPartStatsUpToDate(int partCount, int skip, + IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws Exception { + for (int i = skip; i < partCount; ++i) { + verifyStatsUpToDate(tbl, "i=" + i, Lists.newArrayList("s"), msClient, isUpToDate); + } + } + + private void drainWorkQueue(StatsUpdaterThread su) throws InterruptedException { + while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) {} + } + + private void drainWorkQueue(StatsUpdaterThread su, int expectedReqs) throws InterruptedException { + int actualReqs = 0; + while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) { + ++actualReqs; + } + assertEquals(expectedReqs, actualReqs); + } + + private void setTableSkipProperty( + IMetaStoreClient msClient, String tbl, String val) throws Exception { + Table table = msClient.getTable(ss.getCurrentDatabase(), tbl); + table.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val); + msClient.alter_table(table.getDbName(), table.getTableName(), table); + } + + private void setPartitionSkipProperty( + IMetaStoreClient msClient, String tblName, String partName, String val) throws Exception { + Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName); + part.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val); + msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part); + } + + private void verifyAndUnsetColStats( + String tblName, List cols, IMetaStoreClient msClient) throws Exception { + Table tbl = msClient.getTable(ss.getCurrentDatabase(), tblName); + verifyAndUnsetColStatsVal(tbl.getParameters(), cols); + EnvironmentContext ec = new EnvironmentContext(); + // Make sure metastore doesn't mess with our bogus stats updates. + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + msClient.alter_table_with_environmentContext(tbl.getDbName(), tbl.getTableName(), tbl, ec); + // Double-check. + tbl = msClient.getTable(ss.getCurrentDatabase(), tblName); + for (String col : cols) { + assertFalse(StatsSetupConst.areColumnStatsUptoDate(tbl.getParameters(), col)); + } + } + + private void verifyAndUnsetColStatsVal(Map params, List cols) { + assertTrue(StatsSetupConst.areBasicStatsUptoDate(params)); + for (String col : cols) { + assertTrue(StatsSetupConst.areColumnStatsUptoDate(params, col)); + } + StatsSetupConst.removeColumnStatsState(params, cols); + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); + } + + private void verifyAndUnsetColStats(String tblName, String partName, List cols, + IMetaStoreClient msClient) throws Exception { + Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName); + verifyAndUnsetColStatsVal(part.getParameters(), cols); + EnvironmentContext ec = new EnvironmentContext(); + // Make sure metastore doesn't mess with our bogus stats updates. + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part, ec); + // Double-check. + part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName); + for (String col : cols) { + assertFalse(StatsSetupConst.areColumnStatsUptoDate(part.getParameters(), col)); + } + } + + private void verifyStatsUpToDate(String tbl, ArrayList cols, IMetaStoreClient msClient, + boolean isUpToDate) throws Exception { + Table table = msClient.getTable(ss.getCurrentDatabase(), tbl); + verifyStatsUpToDate(table.getParameters(), cols, isUpToDate); + } + + private void verifyStatsUpToDate(Map params, ArrayList cols, + boolean isUpToDate) { + if (isUpToDate) { + assertTrue(StatsSetupConst.areBasicStatsUptoDate(params)); + } + for (String col : cols) { + assertEquals(isUpToDate, StatsSetupConst.areColumnStatsUptoDate(params, col)); + } + } + + private void verifyStatsUpToDate(String tbl, String part, ArrayList cols, + IMetaStoreClient msClient, boolean isUpToDate) throws Exception { + Partition partition = msClient.getPartition(ss.getCurrentDatabase(), tbl, part); + verifyStatsUpToDate(partition.getParameters(), cols, isUpToDate); + } + + private void executeQuery(String query) throws HiveException { + DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query, null); + } + + private StatsUpdaterThread createUpdater() throws MetaException { + StatsUpdaterThread su = new StatsUpdaterThread(); + su.setConf(hiveConf); + su.init(new AtomicBoolean(false), null); + return su; + } +} diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index be05838ed7..ed53c90eb6 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -310,6 +310,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam } } else { // operations other than table rename + if (MetaStoreUtils.requireCalStats(null, null, newt, environmentContext) && !isPartitionedTable) { Database db = msdb.getDatabase(catName, newDbName); @@ -540,10 +541,11 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } success = msdb.commitTransaction(); } catch (InvalidObjectException e) { - throw new InvalidOperationException("alter is not possible"); - } catch (NoSuchObjectException e){ + LOG.warn("Alter failed", e); + throw new InvalidOperationException("alter is not possible: " + e.getMessage()); + } catch (NoSuchObjectException e) { //old partition does not exist - throw new InvalidOperationException("alter is not possible"); + throw new InvalidOperationException("alter is not possible: " + e.getMessage()); } finally { if(!success) { msdb.rollbackTransaction(); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index d8b8414999..a699f34c19 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode; import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; @@ -8987,6 +8988,7 @@ public void run() { t.start(); } + /** * Start threads outside of the thrift service, such as the compactor threads. * @param conf Hive configuration object @@ -9026,6 +9028,7 @@ public void run() { startCompactorWorkers(conf); startCompactorCleaner(conf); startRemoteOnlyTasks(conf); + startStatsUpdater(conf); } catch (Throwable e) { LOG.error("Failure when starting the compactor, compactions may not happen, " + StringUtils.stringifyException(e)); @@ -9041,6 +9044,14 @@ public void run() { t.start(); } + protected static void startStatsUpdater(Configuration conf) throws Exception { + StatsUpdateMode mode = StatsUpdateMode.valueOf( + MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase()); + if (mode == StatsUpdateMode.NONE) return; + MetaStoreThread t = instantiateThread("org.apache.hadoop.hive.ql.stats.StatsUpdaterThread"); + initializeAndStartThread(t, conf); + } + private static void startCompactorInitiator(Configuration conf) throws Exception { if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) { MetaStoreThread initiator = diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 5bb1985025..3be94e8db8 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.hive.common.util.BloomFilter; import org.datanucleus.store.rdbms.query.ForwardQueryResult; import org.slf4j.Logger; @@ -2714,4 +2715,95 @@ private void dropDanglingColumnDescriptors(List columnDescriptorIdList) } } } + + public final static Object[] STATS_TABLE_TYPES = new Object[] { + TableType.MANAGED_TABLE.toString(), TableType.MATERIALIZED_VIEW.toString() + }; + + public List getTableNamesWithStats() throws MetaException { + // Could we also join with ACID tables to only get tables with outdated stats? + String queryText0 = "SELECT DISTINCT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", " + + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""; + String queryText1 = " WHERE " + TBLS + ".\"TBL_TYPE\" IN (" + + makeParams(STATS_TABLE_TYPES.length) + ")"; + + List result = new ArrayList<>(); + + String queryText = queryText0 + " INNER JOIN " + TAB_COL_STATS + + " ON " + TBLS + ".\"TBL_ID\" = " + TAB_COL_STATS + ".\"TBL_ID\"" + queryText1; + getStatsTableListResult(queryText, result); + + queryText = queryText0 + " INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = " + + PARTITIONS + ".\"TBL_ID\"" + " INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS + + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\"" + queryText1; + getStatsTableListResult(queryText, result); + + return result; + } + + public Map> getColAndPartNamesWithStats( + String catName, String dbName, String tableName) throws MetaException { + // Could we also join with ACID tables to only get tables with outdated stats? + String queryText = "SELECT DISTINCT " + PARTITIONS + ".\"PART_NAME\", " + PART_COL_STATS + + ".\"COLUMN_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + + DBS + ".\"DB_ID\" INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = " + + PARTITIONS + ".\"TBL_ID\" INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS + + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\" WHERE " + DBS + ".\"NAME\" = ? AND " + + DBS + ".\"CTLG_NAME\" = ? AND " + TBLS + ".\"TBL_NAME\" = ? ORDER BY " + + PARTITIONS + ".\"PART_NAME\""; + + LOG.debug("Running {}", queryText); + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + try { + List sqlResult = ensureList(executeWithArray( + query, new Object[] { dbName, catName, tableName }, queryText)); + Map> result = new HashMap<>(); + String lastPartName = null; + List cols = null; + for (Object[] line : sqlResult) { + String col = extractSqlString(line[1]); + String part = extractSqlString(line[0]); + if (!part.equals(lastPartName)) { + if (lastPartName != null) { + result.put(lastPartName, cols); + } + cols = cols == null ? new ArrayList<>() : new ArrayList<>(cols.size()); + lastPartName = part; + } + cols.add(col); + } + if (lastPartName != null) { + result.put(lastPartName, cols); + } + return result; + } finally { + query.closeAll(); + } + } + + public List getAllTableNamesForStats() throws MetaException { + String queryText = "SELECT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", " + + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS + + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " WHERE " + TBLS + ".\"TBL_TYPE\" IN (" + makeParams(STATS_TABLE_TYPES.length) + ")"; + List result = new ArrayList<>(); + getStatsTableListResult(queryText, result); + return result; + } + + private void getStatsTableListResult( + String queryText, List result) throws MetaException { + LOG.debug("Running {}", queryText); + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + try { + List sqlResult = ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText)); + for (Object[] line : sqlResult) { + result.add(new FullTableName( + extractSqlString(line[2]), extractSqlString(line[1]), extractSqlString(line[0]))); + } + } finally { + query.closeAll(); + } + } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b15d89d5c6..e3665b233e 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -56,6 +56,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; + import javax.jdo.JDOCanRetryException; import javax.jdo.JDODataStoreException; import javax.jdo.JDOException; @@ -71,6 +72,7 @@ import javax.sql.DataSource; import com.google.common.base.Strings; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; @@ -211,6 +213,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.hadoop.hive.metastore.utils.ObjectPair; import org.apache.thrift.TException; import org.datanucleus.AbstractNucleusContext; @@ -1553,6 +1556,96 @@ public Table getTable(String catName, String dbName, String tableName) throws Me } } + @Override + public List getTableNamesWithStats() throws MetaException, NoSuchObjectException { + return new GetListHelper(null, null, null, true, false) { + @Override + protected List getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getTableNamesWithStats(); + } + + @Override + protected List getJdoResult( + GetHelper> ctx) throws MetaException { + throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement? + } + }.run(false); + } + + @Override + public Map> getPartitionColsWithStats(String catName, String dbName, String tableName) + throws MetaException, NoSuchObjectException { + return new GetHelper>>(catName, dbName, null, true, false) { + @Override + protected Map> getSqlResult( + GetHelper>> ctx) throws MetaException { + try { + return directSql.getColAndPartNamesWithStats(catName, dbName, tableName); + } catch (Throwable ex) { + LOG.error("DirectSQL failed", ex); + throw new MetaException(ex.getMessage()); + } + } + + @Override + protected Map> getJdoResult( + GetHelper>> ctx) throws MetaException { + throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement? + } + + @Override + protected String describeResult() { + return results.size() + " partitions"; + } + }.run(false); + } + + @Override + public List getAllTableNamesForStats() throws MetaException, NoSuchObjectException { + return new GetListHelper(null, null, null, true, false) { + @Override + protected List getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getAllTableNamesForStats(); + } + + @Override + protected List getJdoResult( + GetHelper> ctx) throws MetaException { + boolean commited = false; + Query query = null; + List result = new ArrayList<>(); + openTransaction(); + try { + String paramStr = "", whereStr = ""; + for (int i = 0; i < MetaStoreDirectSql.STATS_TABLE_TYPES.length; ++i) { + if (i != 0) { + paramStr += ", "; + whereStr += "||"; + } + paramStr += "java.lang.String tt" + i; + whereStr += " tableType == tt" + i; + } + query = pm.newQuery(MTable.class, whereStr); + query.declareParameters(paramStr); + @SuppressWarnings("unchecked") + Collection tbls = (Collection) query.executeWithArray( + query, MetaStoreDirectSql.STATS_TABLE_TYPES); + pm.retrieveAll(tbls); + for (MTable tbl : tbls) { + result.add(new FullTableName( + tbl.getDatabase().getCatalogName(), tbl.getDatabase().getName(), tbl.getTableName())); + } + commited = commitTransaction(); + } finally { + rollbackAndCleanup(commited, query); + } + return result; + } + }.run(false); + } + protected List getTablesInternal(String catName, String dbName, String pattern, TableType tableType, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { @@ -3470,9 +3563,9 @@ public GetHelper(String catalogName, String dbName, String tblName, boolean allowSql, boolean allowJdo) throws MetaException { assert allowSql || allowJdo; this.allowJdo = allowJdo; - this.catName = normalizeIdentifier(catalogName); - this.dbName = normalizeIdentifier(dbName); - if (tblName != null){ + this.catName = (catalogName != null) ? normalizeIdentifier(catalogName) : null; + this.dbName = (dbName != null) ? normalizeIdentifier(dbName) : null; + if (tblName != null) { this.tblName = normalizeIdentifier(tblName); } else { // tblName can be null in cases of Helper being used at a higher diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index 283798cd77..f350aa9fd7 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.thrift.TException; public interface RawStore extends Configurable { @@ -1639,4 +1640,11 @@ void alterSchemaVersion(SchemaVersionDescriptor version, SchemaVersion newVersio /** Removes outdated statistics. */ int deleteRuntimeStats(int maxRetainSecs) throws MetaException; + List getTableNamesWithStats() throws MetaException, NoSuchObjectException; + + List getAllTableNamesForStats() throws MetaException, NoSuchObjectException; + + Map> getPartitionColsWithStats(String catName, String dbName, + String tableName) throws MetaException, NoSuchObjectException; + } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 9da8d728f9..d9356b8d9b 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -2492,4 +2493,20 @@ public void addRuntimeStat(RuntimeStat stat) throws MetaException { public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { return rawStore.deleteRuntimeStats(maxRetainSecs); } + + @Override + public List getTableNamesWithStats() throws MetaException, NoSuchObjectException { + return rawStore.getTableNamesWithStats(); + } + + @Override + public List getAllTableNamesForStats() throws MetaException, NoSuchObjectException { + return rawStore.getAllTableNamesForStats(); + } + + @Override + public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColsWithStats(catName, dbName, tableName); + } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java new file mode 100644 index 0000000000..86954710ff --- /dev/null +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.conf; + +import java.util.Arrays; + +public class EnumValidator extends StringSetValidator { + public EnumValidator(Object[] statsUpdateModes) { + super(false, Arrays.stream(statsUpdateModes).map(eo -> eo.toString()).toArray(String[]::new)); + } +} diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 114d5da205..ab03adbf59 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -83,6 +83,10 @@ @VisibleForTesting static final String TEST_ENV_WORKAROUND = "metastore.testing.env.workaround.dont.ever.set.this."; + public static enum StatsUpdateMode { + NONE, EXISTING, ALL + } + private static class TimeValue { final long val; final TimeUnit unit; @@ -727,6 +731,18 @@ public static ConfVars getMetaConf(String name) { "The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is custom type."), STATS_DEFAULT_PUBLISHER("metastore.stats.default.publisher", "hive.stats.default.publisher", "", "The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is custom type."), + STATS_AUTO_UPDATE("metastore.stats.auto.analyze", "hive.metastore.stats.auto.analyze", "none", + new EnumValidator(StatsUpdateMode.values()), + "Whether to update stats in the background; none - no, all - for all tables, existing - only existing, out of date, stats."), + STATS_AUTO_UPDATE_NOOP_WAIT("metastore.stats.auto.analyze.noop.wait", + "hive.metastore.stats.auto.analyze.noop.wait", 5L, TimeUnit.MINUTES, + new TimeValidator(TimeUnit.MINUTES), + "How long to sleep if there were no stats needing update during an update iteration.\n" + + "This is a setting to throttle table/partition checks when nothing is being changed; not\n" + + "the analyze queries themselves."), + STATS_AUTO_UPDATE_WORKER_COUNT("metastore.stats.auto.analyze.worker.count", + "hive.metastore.stats.auto.analyze.worker.count", 1, + "Number of parallel analyze commands to run for background stats update."), STORAGE_SCHEMA_READER_IMPL("metastore.storage.schema.reader.impl", "metastore.storage.schema.reader.impl", DefaultStorageSchemaReader.class.getName(), "The class to use to read schemas from storage. It must implement " + diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 742b6bf76b..89d77d31a4 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.collections.ListUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -64,7 +65,9 @@ import org.apache.hadoop.util.MachineList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -718,12 +721,12 @@ public static void populateQuickStats(List fileStatus, Map params) { params.remove(StatsSetupConst.NUM_FILES); params.remove(StatsSetupConst.TOTAL_SIZE); } - + public static boolean areSameColumns(List oldCols, List newCols) { return ListUtils.isEqualList(oldCols, newCols); @@ -1797,4 +1800,35 @@ public static String getDefaultCatalog(Configuration conf) { if (catName == null || "".equals(catName)) catName = Warehouse.DEFAULT_CATALOG_NAME; return catName; } + + + public static class FullTableName { + public final String catalog, db, table; + + public FullTableName(String catalog, String db, String table) { + assert catalog != null && db != null && table != null : catalog + ", " + db + ", " + table; + this.catalog = catalog; + this.db = db; + this.table = table; + } + + @Override + public String toString() { + return catalog + MetaStoreUtils.CATALOG_DB_SEPARATOR + db + "." + table; + } + + @Override + public int hashCode() { + final int prime = 31; + return prime * (prime * (prime + catalog.hashCode()) + db.hashCode()) + table.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + FullTableName other = (FullTableName) obj; + return catalog.equals(other.catalog) && db.equals(other.db) && table.equals(other.table); + } + } } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 0461c4ee9a..8c3ada3082 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.thrift.TException; /** @@ -1185,4 +1186,23 @@ public void addRuntimeStat(RuntimeStat stat) throws MetaException { public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { return objectStore.deleteRuntimeStats(maxRetainSecs); } + + @Override + public List getTableNamesWithStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public List getAllTableNamesForStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, + NoSuchObjectException { + return null; + } } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index b71eda4212..f98e8de4c7 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName; import org.apache.thrift.TException; import org.junit.Assert; @@ -1172,4 +1173,23 @@ public void addRuntimeStat(RuntimeStat stat) throws MetaException { public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { return 0; } + + @Override + public List getTableNamesWithStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public List getAllTableNamesForStats() throws MetaException, + NoSuchObjectException { + return null; + } + + @Override + public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, + NoSuchObjectException { + return null; + } } diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java index be9e7c94c4..6968d628a1 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java @@ -356,7 +356,7 @@ public void testCreateTableDefaultValuesView() throws Exception { createdTable.getSd().getLocation()); } - @Test(expected = MetaException.class) + @Test(expected = InvalidObjectException.class) public void testCreateTableNullDatabase() throws Exception { Table table = testTables[0]; table.setDbName(null); @@ -891,7 +891,7 @@ public void testAlterTableCascade() throws Exception { } } - @Test(expected = MetaException.class) + @Test(expected = InvalidOperationException.class) public void testAlterTableNullDatabaseInNew() throws Exception { Table originalTable = testTables[0]; Table newTable = originalTable.deepCopy();