commit f16b77a18fd4a7fde3ce5fef627a2b803860e9f8 Author: Daniel Dai Date: Sun May 21 09:47:24 2017 -0700 HIVE-16579.16.patch diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1c37b6e..2d3652f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -898,7 +898,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default property values for newly created tables"), DDL_CTL_PARAMETERS_WHITELIST("hive.ddl.createtablelike.properties.whitelist", "", "Table Properties to copy over when executing a Create Table Like."), - METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.cache.CachedStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), METASTORE_CACHED_RAW_STORE_IMPL("hive.metastore.cached.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 91a3a38..3dc63bd 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -914,9 +914,9 @@ public void addForeignKeys(List fks) } @Override - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { - return objectStore.getAggrColStatsForTablePartitions(dbName, tableName); + return objectStore.getColStatsForTablePartitions(dbName, tableName); } @Override diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index d296851..93f2009 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -350,7 +350,9 @@ public void initConf() throws Exception { if (!useHBaseMetastore) { // Plug verifying metastore in for testing DirectSQL. conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, - "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + "org.apache.hadoop.hive.metastore.cache.CachedStore"); + conf.setVar(HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, + "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); } else { conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName()); conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b96c27e..df73693 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1208,7 +1208,9 @@ public ColumnStatistics getTableStats(final String dbName, final String tableNam } }; List list = runBatched(colNames, b); - if (list.isEmpty()) return null; + if (list.isEmpty()) { + return null; + } ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); ColumnStatistics result = makeColumnStats(list, csd, 0); b.closeAllQueries(); @@ -1343,41 +1345,26 @@ private long partsFoundForPartitions(final String dbName, final String tableName // Get aggregated column stats for a table per partition for all columns in the partition // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) - public Map getAggrColStatsForTablePartitions(String dbName, - String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " - + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " - // The following data is used to compute a partitioned table's NDV based - // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be - // accurately derived from partition NDVs, because the domain of column value two partitions - // can overlap. If there is no overlap then global NDV is just the sum - // of partition NDVs (UpperBound). But if there is some overlay then - // global NDV can be anywhere between sum of partition NDVs (no overlap) - // and same as one of the partition NDV (domain of column value in all other - // partitions is subset of the domain value in one of the partition) - // (LowerBound).But under uniform distribution, we can roughly estimate the global - // NDV by leveraging the min/max values. - // And, we also guarantee that the estimation makes sense by comparing it to the - // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") - // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + public Map> getColStatsForTablePartitions(String dbName, + String tblName) throws MetaException { + String queryText = + "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", " + + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", " + + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", " + + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\"" + + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + + " order by \"PARTITION_NAME\""; long start = 0; long end = 0; Query query = null; boolean doTrace = LOG.isDebugEnabled(); Object qResult = null; - ForwardQueryResult fqr = null; start = doTrace ? System.nanoTime() : 0; query = pm.newQuery("javax.jdo.query.SQL", queryText); - qResult = executeWithArray(query, - prepareParams(dbName, tblName, new ArrayList(), new ArrayList()), queryText); + qResult = + executeWithArray(query, + prepareParams(dbName, tblName, new ArrayList(), new ArrayList()), + queryText); if (qResult == null) { query.closeAll(); return Maps.newHashMap(); @@ -1385,13 +1372,31 @@ private long partsFoundForPartitions(final String dbName, final String tableName end = doTrace ? System.nanoTime() : 0; timingTrace(doTrace, queryText, start, end); List list = ensureList(qResult); - Map partColStatsMap = new HashMap(); + Map> partColStatsMap = + new HashMap>(); + String partNameCurrent = null; + List partColStatsList = new ArrayList(); for (Object[] row : list) { String partName = (String) row[0]; - String colName = (String) row[1]; - partColStatsMap.put( - CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName), - prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner)); + if (partNameCurrent == null) { + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else if (!partNameCurrent.equalsIgnoreCase(partName)) { + // Save the previous partition and its col stat list + partColStatsMap.put(partNameCurrent, partColStatsList); + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else { + partColStatsList.add(prepareCSObj(row, 1)); + } Deadline.checkTimeout(); } query.closeAll(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 870896c..8328428 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1171,6 +1171,15 @@ public static Properties getSchema( return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols()); } + public static List getColumnNamesForTable(Table table) { + List colNames = new ArrayList(); + Iterator colsIterator = table.getSd().getColsIterator(); + while (colsIterator.hasNext()) { + colNames.add(colsIterator.next().getName()); + } + return colNames; + } + public static String getColumnNameDelimiter(List fieldSchemas) { // we first take a look if any fieldSchemas contain COMMA for (int i = 0; i < fieldSchemas.size(); i++) { @@ -1180,7 +1189,7 @@ public static String getColumnNameDelimiter(List fieldSchemas) { } return String.valueOf(SerDeUtils.COMMA); } - + /** * Convert FieldSchemas to columnNames. */ diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b28983f..19becb8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7173,23 +7173,18 @@ protected String describeResult() { } @Override - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { - final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); - final double ndvTuner = HiveConf.getFloatVar(getConf(), - HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); - return new GetHelper>(dbName, tableName, true, false) { + return new GetHelper>>(dbName, tableName, true, false) { @Override - protected Map getSqlResult( - GetHelper> ctx) throws MetaException { - return directSql.getAggrColStatsForTablePartitions(dbName, tblName, - useDensityFunctionForNDVEstimation, ndvTuner); + protected Map> getSqlResult( + GetHelper>> ctx) throws MetaException { + return directSql.getColStatsForTablePartitions(dbName, tblName); } @Override - protected Map getJdoResult( - GetHelper> ctx) throws MetaException, + protected Map> getJdoResult( + GetHelper>> ctx) throws MetaException, NoSuchObjectException { // This is fast path for query optimizations, if we can find this info // quickly using directSql, do it. No point in failing back to slow path diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index c1af690..964ffb2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -579,14 +579,16 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** - * Get all partition column statistics for a table + * Get all partition column statistics for a table in a db + * * @param dbName * @param tableName - * @return Map of partition column statistics + * @return Map of partition column statistics. Key in the map is partition name. Value is a list + * of column stat object for each column in the partition * @throws MetaException * @throws NoSuchObjectException */ - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException; /** diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java index fcf6f27..2dc2804 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.model.MPartition; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; import org.apache.hadoop.hive.metastore.model.MTable; @@ -700,4 +701,151 @@ private static String createJdoDecimalString(Decimal d) { return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString(); } + /** + * Set field values in oldStatObj from newStatObj + * @param oldStatObj + * @param newStatObj + */ + public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj, + ColumnStatisticsObj newStatObj) { + _Fields typeNew = newStatObj.getStatsData().getSetField(); + _Fields typeOld = oldStatObj.getStatsData().getSetField(); + typeNew = typeNew == typeOld ? typeNew : null; + switch (typeNew) { + case BOOLEAN_STATS: + BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats(); + BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats(); + if (newBooleanStatsData.isSetNumTrues()) { + oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues()); + } + if (newBooleanStatsData.isSetNumFalses()) { + oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses()); + } + if (newBooleanStatsData.isSetNumNulls()) { + oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls()); + } + if (newBooleanStatsData.isSetBitVectors()) { + oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors()); + } + break; + case LONG_STATS: { + LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats(); + LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats(); + if (newLongStatsData.isSetHighValue()) { + oldLongStatsData.setHighValue(newLongStatsData.getHighValue()); + } + if (newLongStatsData.isSetLowValue()) { + oldLongStatsData.setLowValue(newLongStatsData.getLowValue()); + } + if (newLongStatsData.isSetNumNulls()) { + oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls()); + } + if (newLongStatsData.isSetNumDVs()) { + oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs()); + } + if (newLongStatsData.isSetBitVectors()) { + oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors()); + } + break; + } + case DOUBLE_STATS: { + DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats(); + DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats(); + if (newDoubleStatsData.isSetHighValue()) { + oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue()); + } + if (newDoubleStatsData.isSetLowValue()) { + oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue()); + } + if (newDoubleStatsData.isSetNumNulls()) { + oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls()); + } + if (newDoubleStatsData.isSetNumDVs()) { + oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs()); + } + if (newDoubleStatsData.isSetBitVectors()) { + oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors()); + } + break; + } + case STRING_STATS: { + StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats(); + StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats(); + if (newStringStatsData.isSetMaxColLen()) { + oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen()); + } + if (newStringStatsData.isSetAvgColLen()) { + oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen()); + } + if (newStringStatsData.isSetNumNulls()) { + oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls()); + } + if (newStringStatsData.isSetNumDVs()) { + oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs()); + } + if (newStringStatsData.isSetBitVectors()) { + oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors()); + } + break; + } + case BINARY_STATS: + BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats(); + BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats(); + if (newBinaryStatsData.isSetMaxColLen()) { + oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen()); + } + if (newBinaryStatsData.isSetAvgColLen()) { + oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen()); + } + if (newBinaryStatsData.isSetNumNulls()) { + oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls()); + } + if (newBinaryStatsData.isSetBitVectors()) { + oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors()); + } + break; + case DECIMAL_STATS: { + DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats(); + DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats(); + if (newDecimalStatsData.isSetHighValue()) { + oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue()); + } + if (newDecimalStatsData.isSetLowValue()) { + oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue()); + } + if (newDecimalStatsData.isSetNumNulls()) { + oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls()); + } + if (newDecimalStatsData.isSetNumDVs()) { + oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs()); + } + if (newDecimalStatsData.isSetBitVectors()) { + oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors()); + } + break; + } + case DATE_STATS: { + DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats(); + DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats(); + if (newDateStatsData.isSetHighValue()) { + oldDateStatsData.setHighValue(newDateStatsData.getHighValue()); + } + if (newDateStatsData.isSetLowValue()) { + oldDateStatsData.setLowValue(newDateStatsData.getLowValue()); + } + if (newDateStatsData.isSetNumNulls()) { + oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls()); + } + if (newDateStatsData.isSetNumDVs()) { + oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs()); + } + if (newDateStatsData.isSetBitVectors()) { + oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors()); + } + break; + } + default: + throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString()); + } + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index 668499b..280655d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -38,6 +38,10 @@ public static String buildKey(String dbName, String tableName) { return dbName + delimit + tableName; } + public static String buildKeyWithDelimit(String dbName, String tableName) { + return buildKey(dbName, tableName) + delimit; + } + public static String buildKey(String dbName, String tableName, List partVals) { String key = buildKey(dbName, tableName); if (partVals == null || partVals.size() == 0) { @@ -52,11 +56,38 @@ public static String buildKey(String dbName, String tableName, List part return key; } + public static String buildKeyWithDelimit(String dbName, String tableName, List partVals) { + return buildKey(dbName, tableName, partVals) + delimit; + } + public static String buildKey(String dbName, String tableName, List partVals, String colName) { String key = buildKey(dbName, tableName, partVals); return key + delimit + colName; } + public static String buildKey(String dbName, String tableName, String colName) { + String key = buildKey(dbName, tableName); + return key + delimit + colName; + } + + public static String[] splitTableColStats(String key) { + return key.split(delimit); + } + + public static Object[] splitPartitionColStats(String key) { + Object[] result = new Object[4]; + String[] comps = key.split(delimit); + result[0] = comps[0]; + result[1] = comps[1]; + List vals = new ArrayList(); + for (int i=2;i runningMasterThread = new AtomicReference(null); + private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true); + private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); + private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( + true); + private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); RawStore rawStore; Configuration conf; private PartitionExpressionProxy expressionProxy = null; + // Default value set to 100 milliseconds for test purpose + private long cacheRefreshPeriod = 100; static boolean firstTime = true; static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @@ -209,6 +215,8 @@ public void setConf(Configuration conf) { LOG.info("Prewarming CachedStore"); prewarm(); LOG.info("CachedStore initialized"); + // Start the cache update master-worker threads + startCacheUpdateService(); } catch (Exception e) { throw new RuntimeException(e); } @@ -216,7 +224,10 @@ public void setConf(Configuration conf) { } } - private void prewarm() throws Exception { + @VisibleForTesting + void prewarm() throws Exception { + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); List dbNames = rawStore.getAllDatabases(); for (String dbName : dbNames) { Database db = rawStore.getDatabase(dbName); @@ -226,35 +237,81 @@ private void prewarm() throws Exception { Table table = rawStore.getTable(dbName, tblName); SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), table); + Deadline.startTimer("getPartitions"); List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); for (Partition partition : partitions) { SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partition); } - Map aggrStatsPerPartition = rawStore - .getAggrColStatsForTablePartitions(dbName, tblName); - SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition); + // Cache partition column stats + Deadline.startTimer("getColStatsForTablePartitions"); + Map> colStatsPerPartition = + rawStore.getColStatsForTablePartitions(dbName, tblName); + Deadline.stopTimer(); + if (colStatsPerPartition != null) { + SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + } + // Cache table column stats + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { + SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } } } - // Start the cache update master-worker threads - startCacheUpdateService(); } - private synchronized void startCacheUpdateService() { + @VisibleForTesting + synchronized void startCacheUpdateService() { if (cacheUpdateMaster == null) { cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId()); t.setDaemon(true); return t; } }); - cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf - .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, - TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + cacheRefreshPeriod = + HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, + TimeUnit.MILLISECONDS); + } + LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod, + cacheRefreshPeriod, TimeUnit.MILLISECONDS); } } + @VisibleForTesting + synchronized boolean stopCacheUpdateService(long timeout) { + boolean tasksStoppedBeforeShutdown = false; + if (cacheUpdateMaster != null) { + LOG.info("CachedStore: shutting down cache update service"); + try { + tasksStoppedBeforeShutdown = + cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to " + + "complete before shutting down. Will make a hard stop now."); + } + cacheUpdateMaster.shutdownNow(); + cacheUpdateMaster = null; + } + return tasksStoppedBeforeShutdown; + } + + @VisibleForTesting + void setCacheRefreshPeriod(long time) { + this.cacheRefreshPeriod = time; + } + static class CacheUpdateMasterWork implements Runnable { private CachedStore cachedStore; @@ -265,86 +322,175 @@ public CacheUpdateMasterWork(CachedStore cachedStore) { @Override public void run() { - runningMasterThread.set(Thread.currentThread()); - RawStore rawStore = cachedStore.getRawStore(); + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); + LOG.debug("CachedStore: updating cached objects"); + String rawStoreClassName = + HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, + ObjectStore.class.getName()); try { + RawStore rawStore = + ((Class) MetaStoreUtils.getClass(rawStoreClassName)).newInstance(); + rawStore.setConf(cachedStore.conf); List dbNames = rawStore.getAllDatabases(); - // Update the database in cache - if (!updateDatabases(rawStore, dbNames)) { - return; - } - // Update the tables and their partitions in cache - if (!updateTables(rawStore, dbNames)) { - return; + if (dbNames != null) { + // Update the database in cache + updateDatabases(rawStore, dbNames); + for (String dbName : dbNames) { + // Update the tables in cache + updateTables(rawStore, dbName); + List tblNames = cachedStore.getAllTables(dbName); + for (String tblName : tblNames) { + // Update the partitions for a table in cache + updateTablePartitions(rawStore, dbName, tblName); + // Update the table column stats for a table in cache + updateTableColStats(rawStore, dbName, tblName); + // Update the partitions column stats for a table in cache + updateTablePartitionColStats(rawStore, dbName, tblName); + } + } } } catch (MetaException e) { LOG.error("Updating CachedStore: error getting database names", e); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); } } - private boolean updateDatabases(RawStore rawStore, List dbNames) { - if (dbNames != null) { - List databases = new ArrayList(); - for (String dbName : dbNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; + private void updateDatabases(RawStore rawStore, List dbNames) { + // Prepare the list of databases + List databases = new ArrayList(); + for (String dbName : dbNames) { + Database db; + try { + db = rawStore.getDatabase(dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + } + } + // Update the cached database objects + try { + if (databaseCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isDatabaseCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping database cache update; the database list we have is dirty."); + return; } - Database db; - try { - db = rawStore.getDatabase(dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + SharedCache.refreshDatabases(databases); + } + } finally { + if (databaseCacheLock.isWriteLockedByCurrentThread()) { + databaseCacheLock.writeLock().unlock(); + } + } + } + + // Update the cached table objects + private void updateTables(RawStore rawStore, String dbName) { + List tables = new ArrayList
(); + try { + List tblNames = rawStore.getAllTables(dbName); + for (String tblName : tblNames) { + Table table = + rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + tables.add(table); + } + if (tableCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isTableCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table cache update; the table list we have is dirty."); + return; } + SharedCache.refreshTables(dbName, tables); + } + } catch (MetaException e) { + LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); + } finally { + if (tableCacheLock.isWriteLockedByCurrentThread()) { + tableCacheLock.writeLock().unlock(); } - // Update the cached database objects - SharedCache.refreshDatabases(databases); } - return true; } - private boolean updateTables(RawStore rawStore, List dbNames) { - if (dbNames != null) { - List
tables = new ArrayList
(); - for (String dbName : dbNames) { - try { - List tblNames = rawStore.getAllTables(dbName); - for (String tblName : tblNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; - } - Table table = rawStore.getTable(dbName, tblName); - tables.add(table); - } - // Update the cached database objects - SharedCache.refreshTables(dbName, tables); - for (String tblName : tblNames) { - // If a preemption of this thread was requested, simply return before proceeding - if (Thread.interrupted()) { - return false; - } - List partitions = - rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - SharedCache.refreshPartitions(dbName, tblName, partitions); - } - } catch (MetaException | NoSuchObjectException e) { - LOG.error("Updating CachedStore: unable to read table", e); - return false; + // Update the cached partition objects for a table + private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) { + try { + Deadline.startTimer("getPartitions"); + List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + if (partitionCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition cache update; the partition list we have is dirty."); + return; } + SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partitions); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } finally { + if (partitionCacheLock.isWriteLockedByCurrentThread()) { + partitionCacheLock.writeLock().unlock(); } } - return true; } - } - // Interrupt the cache update background thread - // Fire and forget (the master will respond appropriately when it gets a chance) - // All writes to the cache go through synchronized methods, so fire & forget is fine. - private void interruptCacheUpdateMaster() { - if (runningMasterThread.get() != null) { - runningMasterThread.get().interrupt(); + // Update the cached col stats for this table + private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if (tableColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isTableColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table column stats cache update; the table column stats list we " + + "have is dirty."); + return; + } + SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e); + } finally { + if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) { + tableColStatsCacheLock.writeLock().unlock(); + } + } + } + + // Update the cached partition col stats for a table + private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { + try { + Deadline.startTimer("getColStatsForTablePartitions"); + Map> colStatsPerPartition = + rawStore.getColStatsForTablePartitions(dbName, tblName); + Deadline.stopTimer(); + if (partitionColStatsCacheLock.writeLock().tryLock()) { + // Skip background updates if we detect change + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update; the partition column stats " + + "list we have is dirty."); + return; + } + SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions column stats of table: " + + tblName, e); + } finally { + if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { + partitionColStatsCacheLock.writeLock().unlock(); + } + } } } @@ -374,11 +520,17 @@ public void rollbackTransaction() { } @Override - public void createDatabase(Database db) - throws InvalidObjectException, MetaException { + public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - interruptCacheUpdateMaster(); - SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy()); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), + db.deepCopy()); + } finally { + databaseCacheLock.readLock().unlock(); + } } @Override @@ -387,26 +539,38 @@ public Database getDatabase(String dbName) throws NoSuchObjectException { if (db == null) { throw new NoSuchObjectException(); } - return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + return db; } @Override public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(dbname); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + } finally { + databaseCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean alterDatabase(String dbName, Database db) - throws NoSuchObjectException, MetaException { + public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException, + MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + try { + // Wait if background cache update is happening + databaseCacheLock.readLock().lock(); + isDatabaseCacheDirty.set(true); + SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + } finally { + databaseCacheLock.readLock().unlock(); + } } return succ; } @@ -462,24 +626,45 @@ private void validateTableType(Table tbl) { } @Override - public void createTable(Table tbl) - throws InvalidObjectException, MetaException { + public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - interruptCacheUpdateMaster(); validateTableType(tbl); - SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), - HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + } finally { + tableCacheLock.readLock().unlock(); + } } @Override - public boolean dropTable(String dbName, String tableName) - throws MetaException, NoSuchObjectException, InvalidObjectException, - InvalidInputException { + public boolean dropTable(String dbName, String tableName) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tableName); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName)); + // Remove table + try { + // Wait if background table cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + } finally { + tableCacheLock.readLock().unlock(); + } + // Remove table col stats + try { + // Wait if background table col stats cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -496,57 +681,74 @@ public Table getTable(String dbName, String tableName) throws MetaException { } @Override - public boolean addPartition(Partition part) - throws InvalidObjectException, MetaException { + public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), - HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), + HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + } finally { + partitionCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean addPartitions(String dbName, String tblName, - List parts) throws InvalidObjectException, MetaException { + public boolean addPartitions(String dbName, String tblName, List parts) + throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { - interruptCacheUpdateMaster(); - for (Partition part : parts) { - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), - HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (Partition part : parts) { + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } finally { + partitionCacheLock.readLock().unlock(); } } return succ; } @Override - public boolean addPartitions(String dbName, String tblName, - PartitionSpecProxy partitionSpec, boolean ifNotExists) - throws InvalidObjectException, MetaException { + public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, + boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { - interruptCacheUpdateMaster(); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), part); + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } finally { + partitionCacheLock.readLock().unlock(); } } return succ; } @Override - public Partition getPartition(String dbName, String tableName, - List part_vals) throws MetaException, NoSuchObjectException { - Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + public Partition getPartition(String dbName, String tableName, List part_vals) + throws MetaException, NoSuchObjectException { + Partition part = + SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); if (part != null) { part.unsetPrivileges(); } else { - throw new NoSuchObjectException(); + throw new NoSuchObjectException("partition values=" + part_vals.toString()); } return part; } @@ -559,14 +761,30 @@ public boolean doesPartitionExist(String dbName, String tableName, } @Override - public boolean dropPartition(String dbName, String tableName, - List part_vals) throws MetaException, NoSuchObjectException, - InvalidObjectException, InvalidInputException { + public boolean dropPartition(String dbName, String tableName, List part_vals) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); if (succ) { - interruptCacheUpdateMaster(); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tableName), part_vals); + // Remove partition + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } } return succ; } @@ -588,10 +806,28 @@ public boolean dropPartition(String dbName, String tableName, public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); - interruptCacheUpdateMaster(); validateTableType(newTable); - SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), newTable); + // Update table cache + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), newTable); + } finally { + tableCacheLock.readLock().unlock(); + } + // Update partition cache (key might have changed since table name is a + // component of key) + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), newTable); + } finally { + partitionCacheLock.readLock().unlock(); + } } @Override @@ -685,26 +921,62 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public void alterPartition(String dbName, String tblName, - List partVals, Partition newPart) + public void alterPartition(String dbName, String tblName, List partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); - interruptCacheUpdateMaster(); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + // Update partition cache + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Update partition column stats cache + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } } @Override - public void alterPartitions(String dbName, String tblName, - List> partValsList, List newParts) - throws InvalidObjectException, MetaException { + public void alterPartitions(String dbName, String tblName, List> partValsList, + List newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); - interruptCacheUpdateMaster(); - for (int i=0;i partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + // Update partition cache + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (int i = 0; i < partValsList.size(); i++) { + List partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + } finally { + partitionCacheLock.readLock().unlock(); + } + // Update partition column stats cache + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + for (int i = 0; i < partValsList.size(); i++) { + List partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + } finally { + partitionColStatsCacheLock.readLock().unlock(); } } @@ -1095,55 +1367,199 @@ public Partition getPartitionWithAuth(String dbName, String tblName, @Override public boolean updateTableColumnStatistics(ColumnStatistics colStats) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updateTableColumnStatistics(colStats); if (succ) { - SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj()); + String dbName = colStats.getStatsDesc().getDbName(); + String tableName = colStats.getStatsDesc().getTableName(); + List statsObjs = colStats.getStatsObj(); + Table tbl = getTable(dbName, tableName); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); + + // Update table + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), tbl); + } finally { + tableCacheLock.readLock().unlock(); + } + + // Update table col stats + try { + // Wait if background cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), statsObjs); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, - List partVals) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); + public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + List colNames) throws MetaException, NoSuchObjectException { + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + List colStatObjs = new ArrayList(); + for (String colName : colNames) { + String colStatsCacheKey = + CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), colName); + ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey); + if (colStat != null) { + colStatObjs.add(colStat); + } + } + if (colStatObjs.isEmpty()) { + return null; + } else { + return new ColumnStatistics(csd, colStatObjs); + } + } + + @Override + public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); if (succ) { - SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj()); + try { + // Wait if background cache update is happening + tableColStatsCacheLock.readLock().lock(); + isTableColStatsCacheDirty.set(true); + SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), colName); + } finally { + tableColStatsCacheLock.readLock().unlock(); + } } return succ; } @Override - public ColumnStatistics getTableColumnStatistics(String dbName, - String tableName, List colName) - throws MetaException, NoSuchObjectException { - return rawStore.getTableColumnStatistics(dbName, tableName, colName); + public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partVals) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); + if (succ) { + String dbName = colStats.getStatsDesc().getDbName(); + String tableName = colStats.getStatsDesc().getTableName(); + List statsObjs = colStats.getStatsObj(); + Partition part = getPartition(dbName, tableName, partVals); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + + // Update partition + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), partVals, part); + } finally { + partitionCacheLock.readLock().unlock(); + } + + // Update partition column stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.updatePartitionColStatsInCache( + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + colStats.getStatsObj()); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } + return succ; } @Override - public List getPartitionColumnStatistics(String dbName, - String tblName, List partNames, List colNames) - throws MetaException, NoSuchObjectException { + // TODO: calculate from cached values. + // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache. + public List getPartitionColumnStatistics(String dbName, String tblName, + List partNames, List colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, - String tableName, String partName, List partVals, String colName) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { - return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, + List partVals, String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + boolean succ = + rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + if (succ) { + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), partVals, colName); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } + return succ; } @Override - public boolean deleteTableColumnStatistics(String dbName, String tableName, - String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { - return rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, + List colNames) throws MetaException, NoSuchObjectException { + List colStats = new ArrayList(colNames.size()); + for (String colName : colNames) { + ColumnStatisticsObj colStat = + mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNames, colName); + if (colStat == null) { + // Stop and fall back to underlying RawStore + colStats = null; + break; + } else { + colStats.add(colStat); + } + } + if (colStats == null) { + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } else { + return new AggrStats(colStats, partNames.size()); + } + } + + private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, + List partNames, String colName) throws MetaException { + ColumnStatisticsObj colStats = null; + for (String partName : partNames) { + String colStatsCacheKey = + CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = + SharedCache.getCachedPartitionColStats(colStatsCacheKey); + if (colStatsForPart == null) { + // we don't have stats for all the partitions + // logic for extrapolation hasn't been added to CacheStore + // So stop now, and lets fallback to underlying RawStore + return null; + } + if (colStats == null) { + colStats = colStatsForPart; + } else { + ColumnStatsMerger merger = + ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart); + merger.merge(colStats, colStatsForPart); + } + } + return colStats; } @Override @@ -1209,14 +1625,34 @@ public void setMetaStoreSchemaVersion(String version, String comment) } @Override - public void dropPartitions(String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { + public void dropPartitions(String dbName, String tblName, List partNames) + throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); - interruptCacheUpdateMaster(); - for (String partName : partNames) { - List vals = partNameToVals(partName); - SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), vals); + // Remove partitions + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + for (String partName : partNames) { + List vals = partNameToVals(partName); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), vals); + } + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + for (String partName : partNames) { + List part_vals = partNameToVals(partName); + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part_vals); + } + } finally { + partitionColStatsCacheLock.readLock().unlock(); } } @@ -1326,130 +1762,6 @@ public Function getFunction(String dbName, String funcName) } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, - List partNames, List colNames) - throws MetaException, NoSuchObjectException { - List colStats = new ArrayList(colNames.size()); - for (String colName : colNames) { - colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), - HiveStringUtils.normalizeIdentifier(tblName), partNames, colName)); - } - // TODO: revisit the partitions not found case for extrapolation - return new AggrStats(colStats, partNames.size()); - } - - private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, - List partNames, String colName) throws MetaException { - ColumnStatisticsObj colStats = null; - for (String partName : partNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); - ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats( - colStatsCacheKey); - if (colStats == null) { - colStats = colStatsForPart; - } else { - colStats = mergeColStatsObj(colStats, colStatsForPart); - } - } - return colStats; - } - - private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1, - ColumnStatisticsObj colStats2) throws MetaException { - if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType())) - && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) { - throw new MetaException("Can't merge column stats for two partitions for different columns."); - } - ColumnStatisticsData csd = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(), - colStats1.getColType(), csd); - ColumnStatisticsData csData1 = colStats1.getStatsData(); - ColumnStatisticsData csData2 = colStats2.getStatsData(); - String colType = colStats1.getColType().toLowerCase(); - if (colType.equals("boolean")) { - BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); - boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses() - + csData2.getBooleanStats().getNumFalses()); - boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues() - + csData2.getBooleanStats().getNumTrues()); - boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls() - + csData2.getBooleanStats().getNumNulls()); - csd.setBooleanStats(boolStats); - } else if (colType.equals("string") || colType.startsWith("varchar") - || colType.startsWith("char")) { - StringColumnStatsData stringStats = new StringColumnStatsData(); - stringStats.setNumNulls(csData1.getStringStats().getNumNulls() - + csData2.getStringStats().getNumNulls()); - stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2 - .getStringStats().getAvgColLen())); - stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2 - .getStringStats().getMaxColLen())); - stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats() - .getNumDVs())); - csd.setStringStats(stringStats); - } else if (colType.equals("binary")) { - BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); - binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls() - + csData2.getBinaryStats().getNumNulls()); - binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2 - .getBinaryStats().getAvgColLen())); - binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2 - .getBinaryStats().getMaxColLen())); - csd.setBinaryStats(binaryStats); - } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint") - || colType.equals("tinyint") || colType.equals("timestamp")) { - LongColumnStatsData longStats = new LongColumnStatsData(); - longStats.setNumNulls(csData1.getLongStats().getNumNulls() - + csData2.getLongStats().getNumNulls()); - longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats() - .getHighValue())); - longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats() - .getLowValue())); - longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats() - .getNumDVs())); - csd.setLongStats(longStats); - } else if (colType.equals("date")) { - DateColumnStatsData dateStats = new DateColumnStatsData(); - dateStats.setNumNulls(csData1.getDateStats().getNumNulls() - + csData2.getDateStats().getNumNulls()); - dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue() - .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch()))); - dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue() - .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch()))); - dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats() - .getNumDVs())); - csd.setDateStats(dateStats); - } else if (colType.equals("double") || colType.equals("float")) { - DoubleColumnStatsData doubleStats = new DoubleColumnStatsData(); - doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls() - + csData2.getDoubleStats().getNumNulls()); - doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2 - .getDoubleStats().getHighValue())); - doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2 - .getDoubleStats().getLowValue())); - doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats() - .getNumDVs())); - csd.setDoubleStats(doubleStats); - } else if (colType.startsWith("decimal")) { - DecimalColumnStatsData decimalStats = new DecimalColumnStatsData(); - decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls() - + csData2.getDecimalStats().getNumNulls()); - Decimal high = (csData1.getDecimalStats().getHighValue() - .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats() - .getHighValue() : csData2.getDecimalStats().getHighValue(); - decimalStats.setHighValue(high); - Decimal low = (csData1.getDecimalStats().getLowValue() - .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats() - .getLowValue() : csData2.getDecimalStats().getLowValue(); - decimalStats.setLowValue(low); - decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2 - .getDecimalStats().getNumDVs())); - csd.setDecimalStats(decimalStats); - } - return cso; - } - - @Override public NotificationEventResponse getNextNotification( NotificationEventRequest rqst) { return rawStore.getNextNotification(rqst); @@ -1565,10 +1877,9 @@ public void addForeignKeys(List fks) } @Override - public Map getAggrColStatsForTablePartitions( - String dbName, String tableName) - throws MetaException, NoSuchObjectException { - return rawStore.getAggrColStatsForTablePartitions(dbName, tableName); + public Map> getColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + return rawStore.getColStatsForTablePartitions(dbName, tableName); } public RawStore getRawStore() { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 7beee42..425164f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -21,14 +21,18 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; -import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -38,17 +42,26 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; public class SharedCache { private static Map databaseCache = new TreeMap(); private static Map tableCache = new TreeMap(); - private static Map partitionCache = new TreeMap(); - private static Map partitionColStatsCache = new TreeMap(); - private static Map sdCache = new HashMap(); + private static Map partitionCache = + new TreeMap(); + private static Map partitionColStatsCache = + new TreeMap(); + private static Map tableColStatsCache = + new TreeMap(); + private static Map sdCache = + new HashMap(); private static MessageDigest md; + static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); + static { try { md = MessageDigest.getInstance("MD5"); @@ -97,11 +110,13 @@ public static synchronized void addTableToCache(String dbName, String tblName, T Table tblCopy = tbl.deepCopy(); tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); - for (FieldSchema fs : tblCopy.getPartitionKeys()) { - fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + } } TableWrapper wrapper; - if (tbl.getSd()!=null) { + if (tbl.getSd() != null) { byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md); StorageDescriptor sd = tbl.getSd(); increSd(sd, sdHash); @@ -121,10 +136,54 @@ public static synchronized void removeTableFromCache(String dbName, String tblNa } } + public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { + return tableColStatsCache.get(colStatsCacheKey); + } + + public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = + tableColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public static synchronized void removeTableColStatsFromCache(String dbName, String tblName, + String colName) { + tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + } + + public static synchronized void updateTableColStatsInCache(String dbName, String tableName, + List colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); + if (oldStatsObj != null) { + LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() + + ", of table: " + tableName + " and database: " + dbName); + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + tableColStatsCache.put(key, colStatObj); + } + } + } + public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { removeTableFromCache(dbName, tblName); addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + } + + public static synchronized void alterTableInPartitionCache(String dbName, String tblName, + Table newTable) { if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { List partitions = listCachedPartitions(dbName, tblName, -1); for (Partition part : partitions) { @@ -137,6 +196,51 @@ public static synchronized void alterTableInCache(String dbName, String tblName, } } + public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = + tableColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { + String[] decomposedKey = CacheUtils.splitTableColStats(key); + String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); + tableColStatsCache.put(newKey, colStatObj); + iterator.remove(); + } + } + } + } + + public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List partitions = listCachedPartitions(dbName, tblName, -1); + for (Partition part : partitions) { + String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], + (List) decomposedKey[2], (String) decomposedKey[3]); + iterator.remove(); + partitionColStatsCache.put(newKey, colStatObj); + } + } + } + } + } + public static synchronized int getCachedTableCount() { return tableCache.size(); } @@ -151,18 +255,6 @@ public static synchronized int getCachedTableCount() { return tables; } - public static synchronized void updateTableColumnStatistics(String dbName, String tableName, - List statsObjs) { - Table tbl = getTableFromCache(dbName, tableName); - tbl.getSd().getParameters(); - List colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj:statsObjs) { - colNames.add(statsObj.getColName()); - } - StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - alterTableInCache(dbName, tableName, tbl); - } - public static synchronized List getTableMeta(String dbNames, String tableNames, List tableTypes) { List tableMetas = new ArrayList(); for (String dbName : listCachedDatabases()) { @@ -214,14 +306,51 @@ public static synchronized boolean existPartitionFromCache(String dbName, String return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List part_vals) { - PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash()!=null) { + public static synchronized Partition removePartitionFromCache(String dbName, String tblName, + List part_vals) { + PartitionWrapper wrapper = + partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash() != null) { decrSd(wrapper.getSdHash()); } return wrapper.getPartition(); } + // Remove cached column stats for all partitions of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition and a particular column of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals, String colName) { + partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + } + public static synchronized List listCachedPartitions(String dbName, String tblName, int max) { List partitions = new ArrayList(); int count = 0; @@ -236,22 +365,50 @@ public static synchronized Partition removePartitionFromCache(String dbName, Str return partitions; } - public static synchronized void alterPartitionInCache(String dbName, String tblName, List partVals, Partition newPart) { + public static synchronized void alterPartitionInCache(String dbName, String tblName, + List partVals, Partition newPart) { removePartitionFromCache(dbName, tblName, partVals); addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); } - public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName, - List partVals, List statsObjs) { - Partition part = getPartitionFromCache(dbName, tableName, partVals); - part.getSd().getParameters(); - List colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj:statsObjs) { - colNames.add(statsObj.getColName()); + public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName, + List partVals, Partition newPart) { + String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), + HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), + (String) decomposedKey[3]); + partitionColStatsCache.put(newKey, colStatObj); + iterator.remove(); + } + } + } + + public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + List partVals, List colStatsObjs) { + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + LOG.debug("CachedStore: updating partition column stats for column: " + + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + partitionColStatsCache.put(key, colStatObj); + } } - StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - alterPartitionInCache(dbName, tableName, partVals, part); } public static synchronized int getCachedPartitionCount() { @@ -262,10 +419,47 @@ public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String return partitionColStatsCache.get(key); } - public static synchronized void addPartitionColStatsToCache(Map aggrStatsPerPartition) { - partitionColStatsCache.putAll(aggrStatsPerPartition); + public static synchronized void addPartitionColStatsToCache(String dbName, String tableName, + Map> colStatsPerPartition) { + for (Map.Entry> entry : colStatsPerPartition.entrySet()) { + String partName = entry.getKey(); + try { + List partVals = Warehouse.getPartValuesFromPartName(partName); + for (ColumnStatisticsObj colStatObj : entry.getValue()) { + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); + } + } catch (MetaException e) { + LOG.info("Unable to add partition: " + partName + " to SharedCache", e); + } + } + } + + public static synchronized void refreshPartitionColStats(String dbName, String tableName, + Map> newColStatsPerPartition) { + LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName + + " and table: " + tableName); + removePartitionColStatsFromCache(dbName, tableName); + addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); } + public static synchronized void addTableColStatsToCache(String dbName, String tableName, + List colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + tableColStatsCache.put(key, colStatObj); + } + } + + public static synchronized void refreshTableColStats(String dbName, String tableName, + List colStatsForTable) { + LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName + + " and table: " + tableName); + // Remove all old cache entries for this table + removeTableColStatsFromCache(dbName, tableName); + // Add new entries to cache + addTableColStatsToCache(dbName, tableName, colStatsForTable); + } public static void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); @@ -295,6 +489,7 @@ public static StorageDescriptor getSdFromCache(byte[] sdHash) { // Replace databases in databaseCache with the new list public static synchronized void refreshDatabases(List databases) { + LOG.debug("CachedStore: updating cached database objects"); for (String dbName : listCachedDatabases()) { removeDatabaseFromCache(dbName); } @@ -305,6 +500,7 @@ public static synchronized void refreshDatabases(List databases) { // Replace tables in tableCache with the new list public static synchronized void refreshTables(String dbName, List
tables) { + LOG.debug("CachedStore: updating cached table objects for database: " + dbName); for (Table tbl : listCachedTables(dbName)) { removeTableFromCache(dbName, tbl.getTableName()); } @@ -313,17 +509,18 @@ public static synchronized void refreshTables(String dbName, List
tables) } } - public static void refreshPartitions(String dbName, String tblName, List partitions) { - List keysToRemove = new ArrayList(); - for (Map.Entry entry : partitionCache.entrySet()) { - if (entry.getValue().getPartition().getDbName().equals(dbName) - && entry.getValue().getPartition().getTableName().equals(tblName)) { - keysToRemove.add(entry.getKey()); + public static synchronized void refreshPartitions(String dbName, String tblName, + List partitions) { + LOG.debug("CachedStore: updating cached partition objects for database: " + dbName + + " and table: " + tblName); + Iterator> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + PartitionWrapper partitionWrapper = iterator.next().getValue(); + if (partitionWrapper.getPartition().getDbName().equals(dbName) + && partitionWrapper.getPartition().getTableName().equals(tblName)) { + iterator.remove(); } } - for (String key : keysToRemove) { - partitionCache.remove(key); - } for (Partition part : partitions) { addPartitionToCache(dbName, tblName, part); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 0c7d8bb..a7681dd 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2854,7 +2854,7 @@ public void addForeignKeys(List fks) throws InvalidObjectExceptio } @Override - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO: see if it makes sense to implement this here return null; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java index da6cd46..fe890e4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -35,7 +36,7 @@ private ColumnStatsMergerFactory() { } - + // we depend on the toString() method for javolution.util.FastCollection. private static int countNumBitVectors(String s) { if (s != null) { @@ -88,8 +89,15 @@ public static ColumnStatsMerger getColumnStatsMerger(ColumnStatisticsObj statsOb numBitVectors = nbvNew == nbvOld ? nbvNew : 0; break; } + case DATE_STATS: { + agg = new DateColumnStatsMerger(); + int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors()); + int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors()); + numBitVectors = nbvNew == nbvOld ? nbvNew : 0; + break; + } default: - throw new RuntimeException("Woh, bad. Unknown stats type " + typeNew.toString()); + throw new IllegalArgumentException("Unknown stats type " + typeNew.toString()); } if (numBitVectors > 0) { agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors); @@ -127,8 +135,12 @@ public static ColumnStatisticsObj newColumnStaticsObj(String colName, String col csd.setDecimalStats(new DecimalColumnStatsData()); break; + case DATE_STATS: + csd.setDateStats(new DateColumnStatsData()); + break; + default: - throw new RuntimeException("Woh, bad. Unknown stats type!"); + throw new IllegalArgumentException("Unknown stats type"); } cso.setStatsData(csd); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java new file mode 100644 index 0000000..3179b23 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.stats.merge; + +import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Date; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; + +public class DateColumnStatsMerger extends ColumnStatsMerger { + @Override + public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats(); + DateColumnStatsData newData = newColStats.getStatsData().getDateStats(); + Date lowValue = + aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData + .getLowValue() : newData.getLowValue(); + aggregateData.setLowValue(lowValue); + Date highValue = + aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData + .getHighValue() : newData.getHighValue(); + aggregateData.setHighValue(highValue); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } else { + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + long ndv = ndvEstimator.estimateNumDistinctValues(); + LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of " + + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv); + aggregateData.setNumDVs(ndv); + aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + } + } +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index f613c30..f53944f 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -872,7 +872,7 @@ public void addForeignKeys(List fks) } @Override - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO Auto-generated method stub return null; diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 1720e37..e0f5cdb 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -888,7 +888,7 @@ public void addForeignKeys(List fks) } @Override - public Map getAggrColStatsForTablePartitions(String dbName, + public Map> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO Auto-generated method stub return null; diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index 0ab20d6..7a3ec09 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -25,11 +25,22 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.Assert; import org.junit.Before; @@ -37,18 +48,24 @@ public class TestCachedStore { - private CachedStore cachedStore = new CachedStore(); + private ObjectStore objectStore; + private CachedStore cachedStore; @Before public void setUp() throws Exception { HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName()); - - ObjectStore objectStore = new ObjectStore(); + conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + objectStore = new ObjectStore(); objectStore.setConf(conf); + cachedStore = new CachedStore(); + cachedStore.setConf(conf); + // Stop the CachedStore cache update service. We'll start it explicitly to control the test + cachedStore.stopCacheUpdateService(1); - cachedStore.setRawStore(objectStore); - + // Stop the CachedStore cache update service. We'll start it explicitly to control the test + cachedStore.stopCacheUpdateService(1); SharedCache.getDatabaseCache().clear(); SharedCache.getTableCache().clear(); SharedCache.getPartitionCache().clear(); @@ -56,6 +73,426 @@ public void setUp() throws Exception { SharedCache.getPartitionColStatsCache().clear(); } + /********************************************************************************************** + * Methods that test CachedStore + *********************************************************************************************/ + + @Test + public void testDatabaseOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testDatabaseOps"; + String dbDescription = "testDatabaseOps"; + String dbLocation = "file:/tmp"; + Map dbParams = new HashMap(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Add another db via CachedStore + final String dbName1 = "testDatabaseOps1"; + final String dbDescription1 = "testDatabaseOps1"; + Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams); + db1.setOwnerName(dbOwner); + db1.setOwnerType(PrincipalType.USER); + cachedStore.createDatabase(db1); + db1 = cachedStore.getDatabase(dbName1); + + // Read db via ObjectStore + dbNew = objectStore.getDatabase(dbName1); + Assert.assertEquals(db1, dbNew); + + // Alter the db via CachedStore (can only alter owner or parameters) + db = new Database(dbName, dbDescription, dbLocation, dbParams); + dbOwner = "user2"; + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + cachedStore.alterDatabase(dbName, db); + db = cachedStore.getDatabase(dbName); + + // Read db via ObjectStore + dbNew = objectStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Add another db via ObjectStore + final String dbName2 = "testDatabaseOps2"; + final String dbDescription2 = "testDatabaseOps2"; + Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams); + db2.setOwnerName(dbOwner); + db2.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db2); + db2 = objectStore.getDatabase(dbName2); + + // Alter db "testDatabaseOps" via ObjectStore + dbOwner = "user1"; + db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.alterDatabase(dbName, db); + db = objectStore.getDatabase(dbName); + + // Drop db "testDatabaseOps1" via ObjectStore + objectStore.dropDatabase(dbName1); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read the newly added db via CachedStore + dbNew = cachedStore.getDatabase(dbName2); + Assert.assertEquals(db2, dbNew); + + // Read the altered db via CachedStore (altered user from "user2" to "user1") + dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Try to read the dropped db after cache update + try { + dbNew = cachedStore.getDatabase(dbName1); + Assert.fail("The database: " + dbName1 + + " should have been removed from the cache after running the update service"); + } catch (NoSuchObjectException e) { + // Expected + } + + // Clean up + objectStore.dropDatabase(dbName); + objectStore.dropDatabase(dbName2); + } + + @Test + public void testTableOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testTableOps"; + String dbDescription = "testTableOps"; + String dbLocation = "file:/tmp"; + Map dbParams = new HashMap(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + String tblName = "tbl"; + String tblOwner = "user1"; + String serdeLocation = "file:/tmp"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List cols = new ArrayList(); + cols.add(col1); + cols.add(col2); + Map serdeParams = new HashMap(); + Map tblParams = new HashMap(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap()); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database, table via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + Table tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + + // Add a new table via CachedStore + String tblName1 = "tbl1"; + Table tbl1 = + new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + cachedStore.createTable(tbl1); + tbl1 = cachedStore.getTable(dbName, tblName1); + + // Read via object store + tblNew = objectStore.getTable(dbName, tblName1); + Assert.assertEquals(tbl1, tblNew); + + // Add a new table via ObjectStore + String tblName2 = "tbl2"; + Table tbl2 = + new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl2); + tbl2 = objectStore.getTable(dbName, tblName2); + + // Alter table "tbl" via ObjectStore + tblOwner = "user2"; + tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.alterTable(dbName, tblName, tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Drop table "tbl1" via ObjectStore + objectStore.dropTable(dbName, tblName1); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read "tbl2" via CachedStore + tblNew = cachedStore.getTable(dbName, tblName2); + Assert.assertEquals(tbl2, tblNew); + + // Read the altered "tbl" via CachedStore + tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + + // Try to read the dropped "tbl1" via CachedStore (should throw exception) + tblNew = cachedStore.getTable(dbName, tblName1); + Assert.assertNull(tblNew); + + // Should return "tbl" and "tbl2" + List tblNames = cachedStore.getTables(dbName, "*"); + Assert.assertTrue(tblNames.contains(tblName)); + Assert.assertTrue(!tblNames.contains(tblName1)); + Assert.assertTrue(tblNames.contains(tblName2)); + + // Clean up + objectStore.dropTable(dbName, tblName); + objectStore.dropTable(dbName, tblName2); + objectStore.dropDatabase(dbName); + } + + @Test + public void testPartitionOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testPartitionOps"; + String dbDescription = "testPartitionOps"; + String dbLocation = "file:/tmp"; + Map dbParams = new HashMap(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + String tblName = "tbl"; + String tblOwner = "user1"; + String serdeLocation = "file:/tmp"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List cols = new ArrayList(); + cols.add(col1); + cols.add(col2); + Map serdeParams = new HashMap(); + Map tblParams = new HashMap(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); + List ptnCols = new ArrayList(); + ptnCols.add(ptnCol1); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + final String ptnColVal1 = "aaa"; + Map partParams = new HashMap(); + Partition ptn1 = + new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn1); + ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); + final String ptnColVal2 = "bbb"; + Partition ptn2 = + new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn2); + ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database, table, partition via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + Table tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); + Assert.assertEquals(ptn1, newPtn1); + Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.assertEquals(ptn2, newPtn2); + + // Add a new partition via ObjectStore + final String ptnColVal3 = "ccc"; + Partition ptn3 = + new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn3); + ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + + // Alter an existing partition ("aaa") via ObjectStore + final String ptnColVal1Alt = "aaaAlt"; + Partition ptn1Atl = + new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams); + objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl); + ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + + // Drop an existing partition ("bbb") via ObjectStore + objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read the newly added partition via CachedStore + Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + Assert.assertEquals(ptn3, newPtn); + + // Read the altered partition via CachedStore + newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + Assert.assertEquals(ptn1Atl, newPtn); + + // Try to read the dropped partition via CachedStore + try { + newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.fail("The partition: " + ptnColVal2 + + " should have been removed from the cache after running the update service"); + } catch (NoSuchObjectException e) { + // Expected + } + } + + //@Test + public void testTableColStatsOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testTableColStatsOps"; + String dbDescription = "testTableColStatsOps"; + String dbLocation = "file:/tmp"; + Map dbParams = new HashMap(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + final String tblName = "tbl"; + final String tblOwner = "user1"; + final String serdeLocation = "file:/tmp"; + final FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + // Stats values for col1 + long col1LowVal = 5; + long col1HighVal = 500; + long col1Nulls = 10; + long col1DV = 20; + final FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + // Stats values for col2 + long col2MaxColLen = 100; + double col2AvgColLen = 45.5; + long col2Nulls = 5; + long col2DV = 40; + final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column"); + // Stats values for col3 + long col3NumTrues = 100; + long col3NumFalses = 30; + long col3Nulls = 10; + final List cols = new ArrayList(); + cols.add(col1); + cols.add(col2); + cols.add(col3); + Map serdeParams = new HashMap(); + Map tblParams = new HashMap(); + final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Add ColumnStatistics for tbl to metastore DB via ObjectStore + ColumnStatistics stats = new ColumnStatistics(); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); + List colStatObjs = new ArrayList(); + + // Col1 + ColumnStatisticsData data1 = new ColumnStatisticsData(); + ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1); + LongColumnStatsData longStats = new LongColumnStatsData(); + longStats.setLowValue(col1LowVal); + longStats.setHighValue(col1HighVal); + longStats.setNumNulls(col1Nulls); + longStats.setNumDVs(col1DV); + data1.setLongStats(longStats); + colStatObjs.add(col1Stats); + + // Col2 + ColumnStatisticsData data2 = new ColumnStatisticsData(); + ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2); + StringColumnStatsData stringStats = new StringColumnStatsData(); + stringStats.setMaxColLen(col2MaxColLen); + stringStats.setAvgColLen(col2AvgColLen); + stringStats.setNumNulls(col2Nulls); + stringStats.setNumDVs(col2DV); + data2.setStringStats(stringStats); + colStatObjs.add(col2Stats); + + // Col3 + ColumnStatisticsData data3 = new ColumnStatisticsData(); + ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3); + BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); + boolStats.setNumTrues(col3NumTrues); + boolStats.setNumFalses(col3NumFalses); + boolStats.setNumNulls(col3Nulls); + data3.setBooleanStats(boolStats); + colStatObjs.add(col3Stats); + + stats.setStatsDesc(statsDesc); + stats.setStatsObj(colStatObjs); + + // Save to DB + objectStore.updateTableColumnStatistics(stats); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read table stats via CachedStore + ColumnStatistics newStats = + cachedStore.getTableColumnStatistics(dbName, tblName, + Arrays.asList(col1.getName(), col2.getName(), col3.getName())); + Assert.assertEquals(stats, newStats); + } + + private void updateCache(CachedStore cachedStore, long frequency, long sleepTime, + long shutdownTimeout) throws InterruptedException { + // Set cache refresh period to 100 milliseconds + cachedStore.setCacheRefreshPeriod(100); + // Start the CachedStore update service + cachedStore.startCacheUpdateService(); + // Sleep for 500 ms so that cache update is complete + Thread.sleep(500); + // Stop cache update service + cachedStore.stopCacheUpdateService(100); + } + + /********************************************************************************************** + * Methods that test SharedCache + *********************************************************************************************/ + @Test public void testSharedStoreDb() { Database db1 = new Database(); @@ -160,6 +597,7 @@ public void testSharedStoreTable() { Assert.assertEquals(SharedCache.getSdCache().size(), 2); } + @Test public void testSharedStorePartition() { Partition part1 = new Partition();