diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d3ea824..ca1ec03 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -899,6 +899,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), + METASTORE_CACHED_RAW_STORE_IMPL("hive.metastore.cached.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + "Name of the wrapped RawStore class"), + METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY( + "hive.metastore.cached.rawstore.cache.update.frequency", "60", new TimeValidator( + TimeUnit.SECONDS), + "The time after which metastore cache is updated from metastore DB."), METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 5282a5a..88b9faf 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -911,4 +912,10 @@ public void addPrimaryKeys(List pks) public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + return objectStore.getAggrColStatsForTablePartitions(dbName, tableName); + } } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 22c1a33..b96c27e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -61,6 +61,8 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.cache.CacheUtils; +import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; @@ -79,6 +81,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * This class contains the optimizations for MetaStore that rely on direct SQL access to @@ -1338,6 +1341,63 @@ private long partsFoundForPartitions(final String dbName, final String tableName }); } + // Get aggregated column stats for a table per partition for all columns in the partition + // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) + public Map getAggrColStatsForTablePartitions(String dbName, + String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " + + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " + // The following data is used to compute a partitioned table's NDV based + // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be + // accurately derived from partition NDVs, because the domain of column value two partitions + // can overlap. If there is no overlap then global NDV is just the sum + // of partition NDVs (UpperBound). But if there is some overlay then + // global NDV can be anywhere between sum of partition NDVs (no overlap) + // and same as one of the partition NDV (domain of column value in all other + // partitions is subset of the domain value in one of the partition) + // (LowerBound).But under uniform distribution, we can roughly estimate the global + // NDV by leveraging the min/max values. + // And, we also guarantee that the estimation makes sense by comparing it to the + // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") + // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + ForwardQueryResult fqr = null; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tblName, new ArrayList(), new ArrayList()), queryText); + if (qResult == null) { + query.closeAll(); + return Maps.newHashMap(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + Map partColStatsMap = new HashMap(); + for (Object[] row : list) { + String partName = (String) row[0]; + String colName = (String) row[1]; + partColStatsMap.put( + CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName), + prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner)); + Deadline.checkTimeout(); + } + query.closeAll(); + return partColStatsMap; + } + /** Should be called with the list short enough to not trip up Oracle/etc. */ private List columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List partNames, List colNames, boolean areAllPartsFound, diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 8e79e4f..3d07963 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7370,6 +7370,38 @@ protected String describeResult() { } @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + final double ndvTuner = HiveConf.getFloatVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); + return new GetHelper>(dbName, tableName, true, false) { + @Override + protected Map getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getAggrColStatsForTablePartitions(dbName, tblName, + useDensityFunctionForNDVEstimation, ndvTuner); + } + + @Override + protected Map getJdoResult( + GetHelper> ctx) throws MetaException, + NoSuchObjectException { + // This is fast path for query optimizations, if we can find this info + // quickly using directSql, do it. No point in failing back to slow path + // here. + throw new MetaException("Jdo path is not implemented for stats aggr."); + } + + @Override + protected String describeResult() { + return null; + } + }.run(true); + } + + @Override public void flushCache() { // NOP as there's no caching } @@ -8782,5 +8814,4 @@ public void dropConstraint(String dbName, String tableName, } } } - } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 6f4f031..c22a1db 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -578,6 +579,17 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** + * Get all partition column statistics for a table + * @param dbName + * @param tableName + * @return Map of partition column statistics + * @throws MetaException + * @throws NoSuchObjectException + */ + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException; + + /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. * @return list of notifications, sorted by eventId diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java new file mode 100644 index 0000000..45ed1e7 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.Arrays; + +/** + * byte array with comparator + */ +public class ByteArrayWrapper { + byte[] wrapped; + + ByteArrayWrapper(byte[] b) { + wrapped = b; + } + + @Override + public boolean equals(Object other) { + if (other instanceof ByteArrayWrapper) { + return Arrays.equals(((ByteArrayWrapper)other).wrapped, wrapped); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(wrapped); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java new file mode 100644 index 0000000..b438479 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hive.common.util.HiveStringUtils; + +public class CacheUtils { + private static final String delimit = "\u0001"; + + public static String buildKey(String dbName, String tableName) { + return dbName + delimit + tableName; + } + + public static String buildKey(String dbName, String tableName, List partVals) { + String key = buildKey(dbName, tableName); + if (partVals == null || partVals.size() == 0) { + return key; + } + for (int i = 0; i < partVals.size(); i++) { + key += partVals.get(i); + if (i != partVals.size() - 1) { + key += delimit; + } + } + return key; + } + + public static String buildKey(String dbName, String tableName, List partVals, String colName) { + String key = buildKey(dbName, tableName, partVals); + return key + delimit + colName; + } + + public static Table assemble(TableWrapper wrapper) { + Table t = wrapper.getTable().deepCopy(); + if (wrapper.getSdHash()!=null) { + StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); + if (sdCopy.getBucketCols()==null) { + sdCopy.setBucketCols(new ArrayList()); + } + if (sdCopy.getSortCols()==null) { + sdCopy.setSortCols(new ArrayList()); + } + if (sdCopy.getSkewedInfo()==null) { + sdCopy.setSkewedInfo(new SkewedInfo(new ArrayList(), + new ArrayList>(), new HashMap,String>())); + } + sdCopy.setLocation(wrapper.getLocation()); + sdCopy.setParameters(wrapper.getParameters()); + t.setSd(sdCopy); + } + return t; + } + + public static Partition assemble(PartitionWrapper wrapper) { + Partition p = wrapper.getPartition().deepCopy(); + if (wrapper.getSdHash()!=null) { + StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy(); + if (sdCopy.getBucketCols()==null) { + sdCopy.setBucketCols(new ArrayList()); + } + if (sdCopy.getSortCols()==null) { + sdCopy.setSortCols(new ArrayList()); + } + if (sdCopy.getSkewedInfo()==null) { + sdCopy.setSkewedInfo(new SkewedInfo(new ArrayList(), + new ArrayList>(), new HashMap,String>())); + } + sdCopy.setLocation(wrapper.getLocation()); + sdCopy.setParameters(wrapper.getParameters()); + p.setSd(sdCopy); + } + return p; + } + + public static boolean matches(String name, String pattern) { + String[] subpatterns = pattern.trim().split("\\|"); + for (String subpattern : subpatterns) { + subpattern = "(?i)" + subpattern.replaceAll("\\?", ".{1}").replaceAll("\\*", ".*") + .replaceAll("\\^", "\\\\^").replaceAll("\\$", "\\\\$");; + if (Pattern.matches(subpattern, HiveStringUtils.normalizeIdentifier(name))) { + return true; + } + } + return false; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java new file mode 100644 index 0000000..c24ccff --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -0,0 +1,1581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.FileMetadataHandler; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PartFilterExprUtil; +import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Date; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Decimal; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +// TODO filter->expr +// TODO functionCache +// TODO constraintCache +// TODO need sd nested copy? +// TODO String intern +// TODO restructure HBaseStore +// TODO monitor event queue +// TODO initial load slow? +// TODO size estimation +// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation +// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object +// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects) + +public class CachedStore implements RawStore, Configurable { + private static ScheduledExecutorService cacheUpdateMaster = null; + private static AtomicReference runningMasterThread = null; + RawStore rawStore; + Configuration conf; + private PartitionExpressionProxy expressionProxy = null; + static boolean firstTime = true; + + static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); + + static class TableWrapper { + Table t; + String location; + Map parameters; + byte[] sdHash; + TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { + this.t = t; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + public Table getTable() { + return t; + } + public byte[] getSdHash() { + return sdHash; + } + public String getLocation() { + return location; + } + public Map getParameters() { + return parameters; + } + } + + static class PartitionWrapper { + Partition p; + String location; + Map parameters; + byte[] sdHash; + PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { + this.p = p; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + public Partition getPartition() { + return p; + } + public byte[] getSdHash() { + return sdHash; + } + public String getLocation() { + return location; + } + public Map getParameters() { + return parameters; + } + } + + static class StorageDescriptorWrapper { + StorageDescriptor sd; + int refCount = 0; + StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { + this.sd = sd; + this.refCount = refCount; + } + public StorageDescriptor getSd() { + return sd; + } + public int getRefCount() { + return refCount; + } + } + + public CachedStore() { + } + + @Override + public void setConf(Configuration conf) { + String rawStoreClassName = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, + ObjectStore.class.getName()); + try { + rawStore = ((Class) MetaStoreUtils.getClass( + rawStoreClassName)).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); + } + rawStore.setConf(conf); + Configuration oldConf = this.conf; + this.conf = conf; + if (expressionProxy != null && conf != oldConf) { + LOG.warn("Unexpected setConf when we were already configured"); + } + if (expressionProxy == null || conf != oldConf) { + expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); + } + if (firstTime) { + try { + LOG.info("Prewarming CachedStore"); + prewarm(); + LOG.info("CachedStore initialized"); + } catch (Exception e) { + throw new RuntimeException(e); + } + firstTime = false; + } + } + + private void prewarm() throws Exception { + List dbNames = rawStore.getAllDatabases(); + for (String dbName : dbNames) { + Database db = rawStore.getDatabase(dbName); + SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db); + List tblNames = rawStore.getAllTables(dbName); + for (String tblName : tblNames) { + Table table = rawStore.getTable(dbName, tblName); + SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), table); + List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + for (Partition partition : partitions) { + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partition); + } + Map aggrStatsPerPartition = rawStore + .getAggrColStatsForTablePartitions(dbName, tblName); + SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition); + } + } + // Start the cache update master-worker threads + startCacheUpdateService(); + } + + private synchronized void startCacheUpdateService() { + if (cacheUpdateMaster == null) { + cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf + .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, + TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + } + + static class CacheUpdateMasterWork implements Runnable { + + private CachedStore cachedStore; + + public CacheUpdateMasterWork(CachedStore cachedStore) { + this.cachedStore = cachedStore; + } + + @Override + public void run() { + runningMasterThread.set(Thread.currentThread()); + RawStore rawStore = cachedStore.getRawStore(); + try { + List dbNames = rawStore.getAllDatabases(); + // Update the database in cache + if (!updateDatabases(rawStore, dbNames)) { + return; + } + // Update the tables and their partitions in cache + if (!updateTables(rawStore, dbNames)) { + return; + } + } catch (MetaException e) { + LOG.error("Updating CachedStore: error getting database names", e); + } finally { + runningMasterThread.set(null); + } + } + + private boolean updateDatabases(RawStore rawStore, List dbNames) { + if (dbNames != null) { + List databases = new ArrayList(); + for (String dbName : dbNames) { + // If a preemption of this thread was requested, simply return before proceeding + if (Thread.interrupted()) { + return false; + } + Database db; + try { + db = rawStore.getDatabase(dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + } + } + // Update the cached database objects + SharedCache.refreshDatabases(databases); + } + return true; + } + + private boolean updateTables(RawStore rawStore, List dbNames) { + if (dbNames != null) { + List tables = new ArrayList
(); + for (String dbName : dbNames) { + try { + List tblNames = rawStore.getAllTables(dbName); + for (String tblName : tblNames) { + // If a preemption of this thread was requested, simply return before proceeding + if (Thread.interrupted()) { + return false; + } + Table table = rawStore.getTable(dbName, tblName); + tables.add(table); + } + // Update the cached database objects + SharedCache.refreshTables(dbName, tables); + for (String tblName : tblNames) { + // If a preemption of this thread was requested, simply return before proceeding + if (Thread.interrupted()) { + return false; + } + List partitions = + rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + SharedCache.refreshPartitions(dbName, tblName, partitions); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.error("Updating CachedStore: unable to read table", e); + return false; + } + } + } + return true; + } + } + + private void waitForCacheUpdateMaster() { + if (runningMasterThread.get() != null) { + runningMasterThread.get().interrupt(); + while (runningMasterThread.get() != null) { + ; + } + } + } + + @Override + public Configuration getConf() { + return rawStore.getConf(); + } + + @Override + public void shutdown() { + rawStore.shutdown(); + } + + @Override + public boolean openTransaction() { + return rawStore.openTransaction(); + } + + @Override + public boolean commitTransaction() { + return rawStore.commitTransaction(); + } + + @Override + public void rollbackTransaction() { + rawStore.rollbackTransaction(); + } + + @Override + public void createDatabase(Database db) + throws InvalidObjectException, MetaException { + rawStore.createDatabase(db); + waitForCacheUpdateMaster(); + SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy()); + } + + @Override + public Database getDatabase(String dbName) throws NoSuchObjectException { + Database db = SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + if (db == null) { + throw new NoSuchObjectException(); + } + return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + } + + @Override + public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { + boolean succ = rawStore.dropDatabase(dbname); + if (succ) { + waitForCacheUpdateMaster(); + SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname)); + } + return succ; + } + + @Override + public boolean alterDatabase(String dbName, Database db) + throws NoSuchObjectException, MetaException { + boolean succ = rawStore.alterDatabase(dbName, db); + if (succ) { + waitForCacheUpdateMaster(); + SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db); + } + return succ; + } + + @Override + public List getDatabases(String pattern) throws MetaException { + List results = new ArrayList(); + for (String dbName : SharedCache.listCachedDatabases()) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + if (CacheUtils.matches(dbName, pattern)) { + results.add(dbName); + } + } + return results; + } + + @Override + public List getAllDatabases() throws MetaException { + return SharedCache.listCachedDatabases(); + } + + @Override + public boolean createType(Type type) { + return rawStore.createType(type); + } + + @Override + public Type getType(String typeName) { + return rawStore.getType(typeName); + } + + @Override + public boolean dropType(String typeName) { + return rawStore.dropType(typeName); + } + + private void validateTableType(Table tbl) { + // If the table has property EXTERNAL set, update table type + // accordingly + String tableType = tbl.getTableType(); + boolean isExternal = "TRUE".equals(tbl.getParameters().get("EXTERNAL")); + if (TableType.MANAGED_TABLE.toString().equals(tableType)) { + if (isExternal) { + tableType = TableType.EXTERNAL_TABLE.toString(); + } + } + if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) { + if (!isExternal) { + tableType = TableType.MANAGED_TABLE.toString(); + } + } + tbl.setTableType(tableType); + } + + @Override + public void createTable(Table tbl) + throws InvalidObjectException, MetaException { + rawStore.createTable(tbl); + waitForCacheUpdateMaster(); + validateTableType(tbl); + SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + } + + @Override + public boolean dropTable(String dbName, String tableName) + throws MetaException, NoSuchObjectException, InvalidObjectException, + InvalidInputException { + boolean succ = rawStore.dropTable(dbName, tableName); + if (succ) { + waitForCacheUpdateMaster(); + SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + } + return succ; + } + + @Override + public Table getTable(String dbName, String tableName) throws MetaException { + Table tbl = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + if (tbl != null) { + tbl.unsetPrivileges(); + tbl.setRewriteEnabled(tbl.isRewriteEnabled()); + } + return tbl; + } + + @Override + public boolean addPartition(Partition part) + throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartition(part); + if (succ) { + waitForCacheUpdateMaster(); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()), + HiveStringUtils.normalizeIdentifier(part.getTableName()), part); + } + return succ; + } + + @Override + public boolean addPartitions(String dbName, String tblName, + List parts) throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(dbName, tblName, parts); + if (succ) { + waitForCacheUpdateMaster(); + for (Partition part : parts) { + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } + return succ; + } + + @Override + public boolean addPartitions(String dbName, String tblName, + PartitionSpecProxy partitionSpec, boolean ifNotExists) + throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); + if (succ) { + waitForCacheUpdateMaster(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), part); + } + } + return succ; + } + + @Override + public Partition getPartition(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException { + Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + if (part != null) { + part.unsetPrivileges(); + } + return part; + } + + @Override + public boolean doesPartitionExist(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException { + return SharedCache.existPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } + + @Override + public boolean dropPartition(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException, + InvalidObjectException, InvalidInputException { + boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + if (succ) { + waitForCacheUpdateMaster(); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), part_vals); + } + return succ; + } + + @Override + public List getPartitions(String dbName, String tableName, int max) + throws MetaException, NoSuchObjectException { + List parts = SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), max); + if (parts != null) { + for (Partition part : parts) { + part.unsetPrivileges(); + } + } + return parts; + } + + @Override + public void alterTable(String dbName, String tblName, Table newTable) + throws InvalidObjectException, MetaException { + rawStore.alterTable(dbName, tblName, newTable); + waitForCacheUpdateMaster(); + validateTableType(newTable); + SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), newTable); + } + + @Override + public List getTables(String dbName, String pattern) + throws MetaException { + List tableNames = new ArrayList(); + for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + if (CacheUtils.matches(table.getTableName(), pattern)) { + tableNames.add(table.getTableName()); + } + } + return tableNames; + } + + @Override + public List getTables(String dbName, String pattern, + TableType tableType) throws MetaException { + List tableNames = new ArrayList(); + for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + if (CacheUtils.matches(table.getTableName(), pattern) && + table.getTableType().equals(tableType.toString())) { + tableNames.add(table.getTableName()); + } + } + return tableNames; + } + + @Override + public List getTableMeta(String dbNames, String tableNames, + List tableTypes) throws MetaException { + return SharedCache.getTableMeta(HiveStringUtils.normalizeIdentifier(dbNames), + HiveStringUtils.normalizeIdentifier(tableNames), tableTypes); + } + + @Override + public List
getTableObjectsByName(String dbName, + List tblNames) throws MetaException, UnknownDBException { + List
tables = new ArrayList
(); + for (String tblName : tblNames) { + tables.add(SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName))); + } + return tables; + } + + @Override + public List getAllTables(String dbName) throws MetaException { + List tblNames = new ArrayList(); + for (Table tbl : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + tblNames.add(HiveStringUtils.normalizeIdentifier(tbl.getTableName())); + } + return tblNames; + } + + @Override + public List listTableNamesByFilter(String dbName, String filter, + short max_tables) throws MetaException, UnknownDBException { + List tableNames = new ArrayList(); + int count = 0; + for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) { + if (CacheUtils.matches(table.getTableName(), filter) + && (max_tables == -1 || count < max_tables)) { + tableNames.add(table.getTableName()); + count++; + } + } + return tableNames; + } + + @Override + public List listPartitionNames(String dbName, String tblName, + short max_parts) throws MetaException { + List partitionNames = new ArrayList(); + Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), max_parts)) { + if (max_parts == -1 || count < max_parts) { + partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); + } + } + return partitionNames; + } + + @Override + public List listPartitionNamesByFilter(String db_name, + String tbl_name, String filter, short max_parts) throws MetaException { + // TODO Translate filter -> expr + return null; + } + + @Override + public void alterPartition(String dbName, String tblName, + List partVals, Partition newPart) + throws InvalidObjectException, MetaException { + rawStore.alterPartition(dbName, tblName, partVals, newPart); + waitForCacheUpdateMaster(); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + + @Override + public void alterPartitions(String dbName, String tblName, + List> partValsList, List newParts) + throws InvalidObjectException, MetaException { + rawStore.alterPartitions(dbName, tblName, partValsList, newParts); + waitForCacheUpdateMaster(); + for (int i=0;i partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart); + } + } + + @Override + public boolean addIndex(Index index) + throws InvalidObjectException, MetaException { + return rawStore.addIndex(index); + } + + @Override + public Index getIndex(String dbName, String origTableName, String indexName) + throws MetaException { + return rawStore.getIndex(dbName, origTableName, indexName); + } + + @Override + public boolean dropIndex(String dbName, String origTableName, + String indexName) throws MetaException { + return rawStore.dropIndex(dbName, origTableName, indexName); + } + + @Override + public List getIndexes(String dbName, String origTableName, int max) + throws MetaException { + return rawStore.getIndexes(dbName, origTableName, max); + } + + @Override + public List listIndexNames(String dbName, String origTableName, + short max) throws MetaException { + return rawStore.listIndexNames(dbName, origTableName, max); + } + + @Override + public void alterIndex(String dbname, String baseTblName, String name, + Index newIndex) throws InvalidObjectException, MetaException { + rawStore.alterIndex(dbname, baseTblName, name, newIndex); + } + + private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, + String defaultPartName, short maxParts, List result) throws MetaException, NoSuchObjectException { + List parts = SharedCache.listCachedPartitions( + HiveStringUtils.normalizeIdentifier(table.getDbName()), + HiveStringUtils.normalizeIdentifier(table.getTableName()), maxParts); + for (Partition part : parts) { + result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); + } + List columnNames = new ArrayList(); + List typeInfos = new ArrayList(); + for (FieldSchema fs : table.getPartitionKeys()) { + columnNames.add(fs.getName()); + typeInfos.add(TypeInfoFactory.getPrimitiveTypeInfo(fs.getType())); + } + if (defaultPartName == null || defaultPartName.isEmpty()) { + defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); + } + return expressionProxy.filterPartitionsByExpr( + columnNames, typeInfos, expr, defaultPartName, result); + } + + @Override + public List getPartitionsByFilter(String dbName, String tblName, + String filter, short maxParts) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, + String defaultPartitionName, short maxParts, List result) + throws TException { + List partNames = new LinkedList(); + Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName)); + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( + table, expr, defaultPartitionName, maxParts, partNames); + for (String partName : partNames) { + Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + result.add(part); + } + return hasUnknownPartitions; + } + + @Override + public int getNumPartitionsByFilter(String dbName, String tblName, + String filter) throws MetaException, NoSuchObjectException { + Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + // TODO filter -> expr + return 0; + } + + @Override + public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) + throws MetaException, NoSuchObjectException { + String defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); + List partNames = new LinkedList(); + Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames); + return partNames.size(); + } + + public static List partNameToVals(String name) { + if (name == null) return null; + List vals = new ArrayList(); + String[] kvp = name.split("/"); + for (String kv : kvp) { + vals.add(FileUtils.unescapePathName(kv.substring(kv.indexOf('=') + 1))); + } + return vals; + } + + @Override + public List getPartitionsByNames(String dbName, String tblName, + List partNames) throws MetaException, NoSuchObjectException { + List partitions = new ArrayList(); + for (String partName : partNames) { + Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + if (part!=null) { + partitions.add(part); + } + } + return partitions; + } + + @Override + public Table markPartitionForEvent(String dbName, String tblName, + Map partVals, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.markPartitionForEvent(dbName, tblName, partVals, evtType); + } + + @Override + public boolean isPartitionMarkedForEvent(String dbName, String tblName, + Map partName, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.isPartitionMarkedForEvent(dbName, tblName, partName, evtType); + } + + @Override + public boolean addRole(String rowName, String ownerName) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.addRole(rowName, ownerName); + } + + @Override + public boolean removeRole(String roleName) + throws MetaException, NoSuchObjectException { + return rawStore.removeRole(roleName); + } + + @Override + public boolean grantRole(Role role, String userName, + PrincipalType principalType, String grantor, PrincipalType grantorType, + boolean grantOption) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption); + } + + @Override + public boolean revokeRole(Role role, String userName, + PrincipalType principalType, boolean grantOption) + throws MetaException, NoSuchObjectException { + return rawStore.revokeRole(role, userName, principalType, grantOption); + } + + @Override + public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getUserPrivilegeSet(userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getDBPrivilegeSet(dbName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName, + String tableName, String userName, List groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getTablePrivilegeSet(dbName, tableName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getPartitionPrivilegeSet(String dbName, + String tableName, String partition, String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getPartitionPrivilegeSet(dbName, tableName, partition, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, + String tableName, String partitionName, String columnName, + String userName, List groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getColumnPrivilegeSet(dbName, tableName, partitionName, columnName, userName, groupNames); + } + + @Override + public List listPrincipalGlobalGrants( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalGlobalGrants(principalName, principalType); + } + + @Override + public List listPrincipalDBGrants(String principalName, + PrincipalType principalType, String dbName) { + return rawStore.listPrincipalDBGrants(principalName, principalType, dbName); + } + + @Override + public List listAllTableGrants(String principalName, + PrincipalType principalType, String dbName, String tableName) { + return rawStore.listAllTableGrants(principalName, principalType, dbName, tableName); + } + + @Override + public List listPrincipalPartitionGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, List partValues, String partName) { + return rawStore.listPrincipalPartitionGrants(principalName, principalType, dbName, tableName, partValues, partName); + } + + @Override + public List listPrincipalTableColumnGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, String columnName) { + return rawStore.listPrincipalTableColumnGrants(principalName, principalType, dbName, tableName, columnName); + } + + @Override + public List listPrincipalPartitionColumnGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, List partValues, String partName, + String columnName) { + return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, dbName, tableName, partValues, partName, columnName); + } + + @Override + public boolean grantPrivileges(PrivilegeBag privileges) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.grantPrivileges(privileges); + } + + @Override + public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.revokePrivileges(privileges, grantOption); + } + + @Override + public Role getRole(String roleName) throws NoSuchObjectException { + return rawStore.getRole(roleName); + } + + @Override + public List listRoleNames() { + return rawStore.listRoleNames(); + } + + @Override + public List listRoles(String principalName, + PrincipalType principalType) { + return rawStore.listRoles(principalName, principalType); + } + + @Override + public List listRolesWithGrants(String principalName, + PrincipalType principalType) { + return rawStore.listRolesWithGrants(principalName, principalType); + } + + @Override + public List listRoleMembers(String roleName) { + return rawStore.listRoleMembers(roleName); + } + + @Override + public Partition getPartitionWithAuth(String dbName, String tblName, + List partVals, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + Partition p = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partVals); + if (p!=null) { + Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, + userName, groupNames); + p.setPrivileges(privs); + } + return p; + } + + @Override + public List getPartitionsWithAuth(String dbName, String tblName, + short maxParts, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + List partitions = new ArrayList(); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { + if (maxParts == -1 || count < maxParts) { + String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, + userName, groupNames); + part.setPrivileges(privs); + partitions.add(part); + count++; + } + } + return partitions; + } + + @Override + public List listPartitionNamesPs(String dbName, String tblName, + List partVals, short maxParts) + throws MetaException, NoSuchObjectException { + List partNames = new ArrayList(); + int count = 0; + Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { + boolean psMatch = true; + for (int i=0;i listPartitionsPsWithAuth(String dbName, + String tblName, List partVals, short maxParts, String userName, + List groupNames) + throws MetaException, InvalidObjectException, NoSuchObjectException { + List partitions = new ArrayList(); + Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName)); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), maxParts)) { + boolean psMatch = true; + for (int i=0;i partVals) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); + if (succ) { + SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj()); + } + return succ; + } + + @Override + public ColumnStatistics getTableColumnStatistics(String dbName, + String tableName, List colName) + throws MetaException, NoSuchObjectException { + return rawStore.getTableColumnStatistics(dbName, tableName, colName); + } + + @Override + public List getPartitionColumnStatistics(String dbName, + String tblName, List partNames, List colNames) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); + } + + @Override + public boolean deletePartitionColumnStatistics(String dbName, + String tableName, String partName, List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + } + + @Override + public boolean deleteTableColumnStatistics(String dbName, String tableName, + String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + return rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + } + + @Override + public long cleanupEvents() { + return rawStore.cleanupEvents(); + } + + @Override + public boolean addToken(String tokenIdentifier, String delegationToken) { + return rawStore.addToken(tokenIdentifier, delegationToken); + } + + @Override + public boolean removeToken(String tokenIdentifier) { + return rawStore.removeToken(tokenIdentifier); + } + + @Override + public String getToken(String tokenIdentifier) { + return rawStore.getToken(tokenIdentifier); + } + + @Override + public List getAllTokenIdentifiers() { + return rawStore.getAllTokenIdentifiers(); + } + + @Override + public int addMasterKey(String key) throws MetaException { + return rawStore.addMasterKey(key); + } + + @Override + public void updateMasterKey(Integer seqNo, String key) + throws NoSuchObjectException, MetaException { + rawStore.updateMasterKey(seqNo, key); + } + + @Override + public boolean removeMasterKey(Integer keySeq) { + return rawStore.removeMasterKey(keySeq); + } + + @Override + public String[] getMasterKeys() { + return rawStore.getMasterKeys(); + } + + @Override + public void verifySchema() throws MetaException { + rawStore.verifySchema(); + } + + @Override + public String getMetaStoreSchemaVersion() throws MetaException { + return rawStore.getMetaStoreSchemaVersion(); + } + + @Override + public void setMetaStoreSchemaVersion(String version, String comment) + throws MetaException { + rawStore.setMetaStoreSchemaVersion(version, comment); + } + + @Override + public void dropPartitions(String dbName, String tblName, + List partNames) throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(dbName, tblName, partNames); + waitForCacheUpdateMaster(); + for (String partName : partNames) { + List vals = partNameToVals(partName); + SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), vals); + } + } + + @Override + public List listPrincipalDBGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalDBGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalTableGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalPartitionGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalPartitionGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableColumnGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalTableColumnGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalPartitionColumnGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType); + } + + @Override + public List listGlobalGrantsAll() { + return rawStore.listGlobalGrantsAll(); + } + + @Override + public List listDBGrantsAll(String dbName) { + return rawStore.listDBGrantsAll(dbName); + } + + @Override + public List listPartitionColumnGrantsAll(String dbName, + String tableName, String partitionName, String columnName) { + return rawStore.listPartitionColumnGrantsAll(dbName, tableName, partitionName, columnName); + } + + @Override + public List listTableGrantsAll(String dbName, + String tableName) { + return rawStore.listTableGrantsAll(dbName, tableName); + } + + @Override + public List listPartitionGrantsAll(String dbName, + String tableName, String partitionName) { + return rawStore.listPartitionGrantsAll(dbName, tableName, partitionName); + } + + @Override + public List listTableColumnGrantsAll(String dbName, + String tableName, String columnName) { + return rawStore.listTableColumnGrantsAll(dbName, tableName, columnName); + } + + @Override + public void createFunction(Function func) + throws InvalidObjectException, MetaException { + // TODO fucntionCache + rawStore.createFunction(func); + } + + @Override + public void alterFunction(String dbName, String funcName, + Function newFunction) throws InvalidObjectException, MetaException { + // TODO fucntionCache + rawStore.alterFunction(dbName, funcName, newFunction); + } + + @Override + public void dropFunction(String dbName, String funcName) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { + // TODO fucntionCache + rawStore.dropFunction(dbName, funcName); + } + + @Override + public Function getFunction(String dbName, String funcName) + throws MetaException { + // TODO fucntionCache + return rawStore.getFunction(dbName, funcName); + } + + @Override + public List getAllFunctions() throws MetaException { + // TODO fucntionCache + return rawStore.getAllFunctions(); + } + + @Override + public List getFunctions(String dbName, String pattern) + throws MetaException { + // TODO fucntionCache + return rawStore.getFunctions(dbName, pattern); + } + + @Override + public AggrStats get_aggr_stats_for(String dbName, String tblName, + List partNames, List colNames) + throws MetaException, NoSuchObjectException { + List colStats = new ArrayList(colNames.size()); + for (String colName : colNames) { + colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNames, colName)); + } + // TODO: revisit the partitions not found case for extrapolation + return new AggrStats(colStats, partNames.size()); + } + + private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, + List partNames, String colName) throws MetaException { + ColumnStatisticsObj colStats = null; + for (String partName : partNames) { + String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats( + colStatsCacheKey); + if (colStats == null) { + colStats = colStatsForPart; + } else { + colStats = mergeColStatsObj(colStats, colStatsForPart); + } + } + return colStats; + } + + private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1, + ColumnStatisticsObj colStats2) throws MetaException { + if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType())) + && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) { + throw new MetaException("Can't merge column stats for two partitions for different columns."); + } + ColumnStatisticsData csd = new ColumnStatisticsData(); + ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(), + colStats1.getColType(), csd); + ColumnStatisticsData csData1 = colStats1.getStatsData(); + ColumnStatisticsData csData2 = colStats2.getStatsData(); + String colType = colStats1.getColType().toLowerCase(); + if (colType.equals("boolean")) { + BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); + boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses() + + csData2.getBooleanStats().getNumFalses()); + boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues() + + csData2.getBooleanStats().getNumTrues()); + boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls() + + csData2.getBooleanStats().getNumNulls()); + csd.setBooleanStats(boolStats); + } else if (colType.equals("string") || colType.startsWith("varchar") + || colType.startsWith("char")) { + StringColumnStatsData stringStats = new StringColumnStatsData(); + stringStats.setNumNulls(csData1.getStringStats().getNumNulls() + + csData2.getStringStats().getNumNulls()); + stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2 + .getStringStats().getAvgColLen())); + stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2 + .getStringStats().getMaxColLen())); + stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats() + .getNumDVs())); + csd.setStringStats(stringStats); + } else if (colType.equals("binary")) { + BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); + binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls() + + csData2.getBinaryStats().getNumNulls()); + binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2 + .getBinaryStats().getAvgColLen())); + binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2 + .getBinaryStats().getMaxColLen())); + csd.setBinaryStats(binaryStats); + } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint") + || colType.equals("tinyint") || colType.equals("timestamp")) { + LongColumnStatsData longStats = new LongColumnStatsData(); + longStats.setNumNulls(csData1.getLongStats().getNumNulls() + + csData2.getLongStats().getNumNulls()); + longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats() + .getHighValue())); + longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats() + .getLowValue())); + longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats() + .getNumDVs())); + csd.setLongStats(longStats); + } else if (colType.equals("date")) { + DateColumnStatsData dateStats = new DateColumnStatsData(); + dateStats.setNumNulls(csData1.getDateStats().getNumNulls() + + csData2.getDateStats().getNumNulls()); + dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue() + .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch()))); + dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue() + .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch()))); + dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats() + .getNumDVs())); + csd.setDateStats(dateStats); + } else if (colType.equals("double") || colType.equals("float")) { + DoubleColumnStatsData doubleStats = new DoubleColumnStatsData(); + doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls() + + csData2.getDoubleStats().getNumNulls()); + doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2 + .getDoubleStats().getHighValue())); + doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2 + .getDoubleStats().getLowValue())); + doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats() + .getNumDVs())); + csd.setDoubleStats(doubleStats); + } else if (colType.startsWith("decimal")) { + DecimalColumnStatsData decimalStats = new DecimalColumnStatsData(); + decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls() + + csData2.getDecimalStats().getNumNulls()); + Decimal high = (csData1.getDecimalStats().getHighValue() + .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats() + .getHighValue() : csData2.getDecimalStats().getHighValue(); + decimalStats.setHighValue(high); + Decimal low = (csData1.getDecimalStats().getLowValue() + .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats() + .getLowValue() : csData2.getDecimalStats().getLowValue(); + decimalStats.setLowValue(low); + decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2 + .getDecimalStats().getNumDVs())); + csd.setDecimalStats(decimalStats); + } + return cso; + } + + @Override + public NotificationEventResponse getNextNotification( + NotificationEventRequest rqst) { + return rawStore.getNextNotification(rqst); + } + + @Override + public void addNotificationEvent(NotificationEvent event) { + rawStore.addNotificationEvent(event); + } + + @Override + public void cleanNotificationEvents(int olderThan) { + rawStore.cleanNotificationEvents(olderThan); + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + return rawStore.getCurrentNotificationEventId(); + } + + @Override + public void flushCache() { + rawStore.flushCache(); + } + + @Override + public ByteBuffer[] getFileMetadata(List fileIds) throws MetaException { + return rawStore.getFileMetadata(fileIds); + } + + @Override + public void putFileMetadata(List fileIds, List metadata, + FileMetadataExprType type) throws MetaException { + rawStore.putFileMetadata(fileIds, metadata, type); + } + + @Override + public boolean isFileMetadataSupported() { + return rawStore.isFileMetadataSupported(); + } + + @Override + public void getFileMetadataByExpr(List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, + ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException { + rawStore.getFileMetadataByExpr(fileIds, type, expr, metadatas, exprResults, eliminated); + } + + @Override + public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { + return rawStore.getFileMetadataHandler(type); + } + + @Override + public int getTableCount() throws MetaException { + return SharedCache.getCachedTableCount(); + } + + @Override + public int getPartitionCount() throws MetaException { + return SharedCache.getCachedPartitionCount(); + } + + @Override + public int getDatabaseCount() throws MetaException { + return SharedCache.getCachedDatabaseCount(); + } + + @Override + public List getPrimaryKeys(String db_name, String tbl_name) + throws MetaException { + // TODO constraintCache + return rawStore.getPrimaryKeys(db_name, tbl_name); + } + + @Override + public List getForeignKeys(String parent_db_name, + String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) + throws MetaException { + // TODO constraintCache + return rawStore.getForeignKeys(parent_db_name, parent_tbl_name, foreign_db_name, foreign_tbl_name); + } + + @Override + public void createTableWithConstraints(Table tbl, + List primaryKeys, List foreignKeys) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys); + SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()), + HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + } + + @Override + public void dropConstraint(String dbName, String tableName, + String constraintName) throws NoSuchObjectException { + // TODO constraintCache + rawStore.dropConstraint(dbName, tableName, constraintName); + } + + @Override + public void addPrimaryKeys(List pks) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.addPrimaryKeys(pks); + } + + @Override + public void addForeignKeys(List fks) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.addForeignKeys(fks); + } + + @Override + public Map getAggrColStatsForTablePartitions( + String dbName, String tableName) + throws MetaException, NoSuchObjectException { + return rawStore.getAggrColStatsForTablePartitions(dbName, tableName); + } + + public RawStore getRawStore() { + return rawStore; + } + + @VisibleForTesting + public void setRawStore(RawStore rawStore) { + this.rawStore = rawStore; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java new file mode 100644 index 0000000..7beee42 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; +import org.apache.hive.common.util.HiveStringUtils; + +import com.google.common.annotations.VisibleForTesting; + +public class SharedCache { + private static Map databaseCache = new TreeMap(); + private static Map tableCache = new TreeMap(); + private static Map partitionCache = new TreeMap(); + private static Map partitionColStatsCache = new TreeMap(); + private static Map sdCache = new HashMap(); + private static MessageDigest md; + + static { + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("should not happen", e); + } + } + + public static synchronized Database getDatabaseFromCache(String name) { + return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; + } + + public static synchronized void addDatabaseToCache(String dbName, Database db) { + Database dbCopy = db.deepCopy(); + dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName)); + databaseCache.put(dbName, dbCopy); + } + + public static synchronized void removeDatabaseFromCache(String dbName) { + databaseCache.remove(dbName); + } + + public static synchronized List listCachedDatabases() { + return new ArrayList(databaseCache.keySet()); + } + + public static synchronized void alterDatabaseInCache(String dbName, Database newDb) { + removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); + } + + public static synchronized int getCachedDatabaseCount() { + return databaseCache.size(); + } + + public static synchronized Table getTableFromCache(String dbName, String tableName) { + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper == null) { + return null; + } + Table t = CacheUtils.assemble(tblWrapper); + return t; + } + + public static synchronized void addTableToCache(String dbName, String tblName, Table tbl) { + Table tblCopy = tbl.deepCopy(); + tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); + tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + } + TableWrapper wrapper; + if (tbl.getSd()!=null) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md); + StorageDescriptor sd = tbl.getSd(); + increSd(sd, sdHash); + tblCopy.setSd(null); + wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new TableWrapper(tblCopy, null, null, null); + } + tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + } + + public static synchronized void removeTableFromCache(String dbName, String tblName) { + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash!=null) { + decrSd(sdHash); + } + } + + public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { + removeTableFromCache(dbName, tblName); + addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), + HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List partitions = listCachedPartitions(dbName, tblName, -1); + for (Partition part : partitions) { + removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); + part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName())); + part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName())); + addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), + HiveStringUtils.normalizeIdentifier(newTable.getTableName()), part); + } + } + } + + public static synchronized int getCachedTableCount() { + return tableCache.size(); + } + + public static synchronized List
listCachedTables(String dbName) { + List
tables = new ArrayList
(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tables.add(CacheUtils.assemble(wrapper)); + } + } + return tables; + } + + public static synchronized void updateTableColumnStatistics(String dbName, String tableName, + List statsObjs) { + Table tbl = getTableFromCache(dbName, tableName); + tbl.getSd().getParameters(); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj:statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); + alterTableInCache(dbName, tableName, tbl); + } + + public static synchronized List getTableMeta(String dbNames, String tableNames, List tableTypes) { + List tableMetas = new ArrayList(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes==null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = new TableMeta( + dbName, table.getTableName(), table.getTableType()); + metaData.setComments(table.getParameters().get("comment")); + tableMetas.add(metaData); + } + } + } + } + } + return tableMetas; + } + + public static synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { + Partition partCopy = part.deepCopy(); + PartitionWrapper wrapper; + if (part.getSd()!=null) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + increSd(sd, sdHash); + partCopy.setSd(null); + wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new PartitionWrapper(partCopy, null, null, null); + } + partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); + } + + public static synchronized Partition getPartitionFromCache(String key) { + PartitionWrapper wrapper = partitionCache.get(key); + if (wrapper == null) { + return null; + } + Partition p = CacheUtils.assemble(wrapper); + return p; + } + + public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List part_vals) { + return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List part_vals) { + return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List part_vals) { + PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash()!=null) { + decrSd(wrapper.getSdHash()); + } + return wrapper.getPartition(); + } + + public static synchronized List listCachedPartitions(String dbName, String tblName, int max) { + List partitions = new ArrayList(); + int count = 0; + for (PartitionWrapper wrapper : partitionCache.values()) { + if (wrapper.getPartition().getDbName().equals(dbName) + && wrapper.getPartition().getTableName().equals(tblName) + && (max == -1 || count < max)) { + partitions.add(CacheUtils.assemble(wrapper)); + count++; + } + } + return partitions; + } + + public static synchronized void alterPartitionInCache(String dbName, String tblName, List partVals, Partition newPart) { + removePartitionFromCache(dbName, tblName, partVals); + addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), + HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); + } + + public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName, + List partVals, List statsObjs) { + Partition part = getPartitionFromCache(dbName, tableName, partVals); + part.getSd().getParameters(); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj:statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + alterPartitionInCache(dbName, tableName, partVals, part); + } + + public static synchronized int getCachedPartitionCount() { + return partitionCache.size(); + } + + public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { + return partitionColStatsCache.get(key); + } + + public static synchronized void addPartitionColStatsToCache(Map aggrStatsPerPartition) { + partitionColStatsCache.putAll(aggrStatsPerPartition); + } + + + public static void increSd(StorageDescriptor sd, byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + if (sdCache.containsKey(byteArray)) { + sdCache.get(byteArray).refCount++; + } else { + StorageDescriptor sdToCache = sd.deepCopy(); + sdToCache.setLocation(null); + sdToCache.setParameters(null); + sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1)); + } + } + + public static void decrSd(byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); + sdWrapper.refCount--; + if (sdWrapper.getRefCount() == 0) { + sdCache.remove(byteArray); + } + } + + public static StorageDescriptor getSdFromCache(byte[] sdHash) { + StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); + return sdWrapper.getSd(); + } + + // Replace databases in databaseCache with the new list + public static synchronized void refreshDatabases(List databases) { + for (String dbName : listCachedDatabases()) { + removeDatabaseFromCache(dbName); + } + for (Database db : databases) { + addDatabaseToCache(db.getName(), db); + } + } + + // Replace tables in tableCache with the new list + public static synchronized void refreshTables(String dbName, List
tables) { + for (Table tbl : listCachedTables(dbName)) { + removeTableFromCache(dbName, tbl.getTableName()); + } + for (Table tbl : tables) { + addTableToCache(dbName, tbl.getTableName(), tbl); + } + } + + public static void refreshPartitions(String dbName, String tblName, List partitions) { + List keysToRemove = new ArrayList(); + for (Map.Entry entry : partitionCache.entrySet()) { + if (entry.getValue().getPartition().getDbName().equals(dbName) + && entry.getValue().getPartition().getTableName().equals(tblName)) { + keysToRemove.add(entry.getKey()); + } + } + for (String key : keysToRemove) { + partitionCache.remove(key); + } + for (Partition part : partitions) { + addPartitionToCache(dbName, tblName, part); + } + } + + @VisibleForTesting + static Map getDatabaseCache() { + return databaseCache; + } + + @VisibleForTesting + static Map getTableCache() { + return tableCache; + } + + @VisibleForTesting + static Map getPartitionCache() { + return partitionCache; + } + + @VisibleForTesting + static Map getSdCache() { + return sdCache; + } + + @VisibleForTesting + static Map getPartitionColStatsCache() { + return partitionColStatsCache; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index f9619e5..f6420f5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2852,4 +2852,11 @@ public void addForeignKeys(List fks) throws InvalidObjectExceptio commitOrRoleBack(commit); } } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO: see if it makes sense to implement this here + return null; + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 94087b1..3172f92 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -619,7 +619,7 @@ private static ResourceType convertResourceTypes( * @param md message descriptor to use to generate the hash * @return the hash as a byte array */ - static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { + public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { // Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different // results for hashes based on the OS or JVM being used. md.reset(); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index f64b08d..3e3fd20 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -869,4 +870,11 @@ public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 2682886..91d8c2a 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -885,6 +886,13 @@ public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java index 9acf9d7..a8c7ac3 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.thrift.TException; -class VerifyingObjectStore extends ObjectStore { +public class VerifyingObjectStore extends ObjectStore { private static final Logger LOG = LoggerFactory.getLogger(VerifyingObjectStore.class); public VerifyingObjectStore() { diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java new file mode 100644 index 0000000..253d2a2 --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestCachedStore { + + private CachedStore cachedStore = new CachedStore(); + + @Before + public void setUp() throws Exception { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName()); + + ObjectStore objectStore = new ObjectStore(); + objectStore.setConf(conf); + + cachedStore.setRawStore(objectStore); + + SharedCache.getDatabaseCache().clear(); + SharedCache.getTableCache().clear(); + SharedCache.getPartitionCache().clear(); + SharedCache.getSdCache().clear(); + SharedCache.getPartitionColStatsCache().clear(); + } + + @Test + public void testSharedStoreDb() { + Database db1 = new Database(); + Database db2 = new Database(); + Database db3 = new Database(); + Database newDb1 = new Database(); + + SharedCache.addDatabaseToCache("db1", db1); + SharedCache.addDatabaseToCache("db2", db2); + SharedCache.addDatabaseToCache("db3", db3); + + Assert.assertEquals(SharedCache.getCachedDatabaseCount(), 3); + + SharedCache.alterDatabaseInCache("db1", newDb1); + + Assert.assertEquals(SharedCache.getCachedDatabaseCount(), 3); + + SharedCache.removeDatabaseFromCache("db2"); + + Assert.assertEquals(SharedCache.getCachedDatabaseCount(), 2); + + List dbs = SharedCache.listCachedDatabases(); + Assert.assertEquals(dbs.size(), 2); + Assert.assertTrue(dbs.contains("db1")); + Assert.assertTrue(dbs.contains("db3")); + } + + @Test + public void testSharedStoreTable() { + Table tbl1 = new Table(); + StorageDescriptor sd1 = new StorageDescriptor(); + List cols1 = new ArrayList(); + cols1.add(new FieldSchema("col1", "int", "")); + Map params1 = new HashMap(); + params1.put("key", "value"); + sd1.setCols(cols1); + sd1.setParameters(params1); + sd1.setLocation("loc1"); + tbl1.setSd(sd1); + tbl1.setPartitionKeys(new ArrayList()); + + Table tbl2 = new Table(); + StorageDescriptor sd2 = new StorageDescriptor(); + List cols2 = new ArrayList(); + cols2.add(new FieldSchema("col1", "int", "")); + Map params2 = new HashMap(); + params2.put("key", "value"); + sd2.setCols(cols2); + sd2.setParameters(params2); + sd2.setLocation("loc2"); + tbl2.setSd(sd2); + tbl2.setPartitionKeys(new ArrayList()); + + Table tbl3 = new Table(); + StorageDescriptor sd3 = new StorageDescriptor(); + List cols3 = new ArrayList(); + cols3.add(new FieldSchema("col3", "int", "")); + Map params3 = new HashMap(); + params3.put("key2", "value2"); + sd3.setCols(cols3); + sd3.setParameters(params3); + sd3.setLocation("loc3"); + tbl3.setSd(sd3); + tbl3.setPartitionKeys(new ArrayList()); + + Table newTbl1 = new Table(); + StorageDescriptor newSd1 = new StorageDescriptor(); + List newCols1 = new ArrayList(); + newCols1.add(new FieldSchema("newcol1", "int", "")); + Map newParams1 = new HashMap(); + newParams1.put("key", "value"); + newSd1.setCols(newCols1); + newSd1.setParameters(params1); + newSd1.setLocation("loc1"); + newTbl1.setSd(newSd1); + newTbl1.setPartitionKeys(new ArrayList()); + + SharedCache.addTableToCache("db1", "tbl1", tbl1); + SharedCache.addTableToCache("db1", "tbl2", tbl2); + SharedCache.addTableToCache("db1", "tbl3", tbl3); + SharedCache.addTableToCache("db2", "tbl1", tbl1); + + Assert.assertEquals(SharedCache.getCachedTableCount(), 4); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + + Table t = SharedCache.getTableFromCache("db1", "tbl1"); + Assert.assertEquals(t.getSd().getLocation(), "loc1"); + + SharedCache.removeTableFromCache("db1", "tbl1"); + Assert.assertEquals(SharedCache.getCachedTableCount(), 3); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + + SharedCache.alterTableInCache("db2", "tbl1", newTbl1); + Assert.assertEquals(SharedCache.getCachedTableCount(), 3); + Assert.assertEquals(SharedCache.getSdCache().size(), 3); + + SharedCache.removeTableFromCache("db1", "tbl2"); + Assert.assertEquals(SharedCache.getCachedTableCount(), 2); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + } + + @Test + public void testSharedStorePartition() { + Partition part1 = new Partition(); + StorageDescriptor sd1 = new StorageDescriptor(); + List cols1 = new ArrayList(); + cols1.add(new FieldSchema("col1", "int", "")); + Map params1 = new HashMap(); + params1.put("key", "value"); + sd1.setCols(cols1); + sd1.setParameters(params1); + sd1.setLocation("loc1"); + part1.setSd(sd1); + part1.setValues(Arrays.asList("201701")); + + Partition part2 = new Partition(); + StorageDescriptor sd2 = new StorageDescriptor(); + List cols2 = new ArrayList(); + cols2.add(new FieldSchema("col1", "int", "")); + Map params2 = new HashMap(); + params2.put("key", "value"); + sd2.setCols(cols2); + sd2.setParameters(params2); + sd2.setLocation("loc2"); + part2.setSd(sd2); + part2.setValues(Arrays.asList("201702")); + + Partition part3 = new Partition(); + StorageDescriptor sd3 = new StorageDescriptor(); + List cols3 = new ArrayList(); + cols3.add(new FieldSchema("col3", "int", "")); + Map params3 = new HashMap(); + params3.put("key2", "value2"); + sd3.setCols(cols3); + sd3.setParameters(params3); + sd3.setLocation("loc3"); + part3.setSd(sd3); + part3.setValues(Arrays.asList("201703")); + + Partition newPart1 = new Partition(); + StorageDescriptor newSd1 = new StorageDescriptor(); + List newCols1 = new ArrayList(); + newCols1.add(new FieldSchema("newcol1", "int", "")); + Map newParams1 = new HashMap(); + newParams1.put("key", "value"); + newSd1.setCols(newCols1); + newSd1.setParameters(params1); + newSd1.setLocation("loc1"); + newPart1.setSd(newSd1); + newPart1.setValues(Arrays.asList("201701")); + + SharedCache.addPartitionToCache("db1", "tbl1", part1); + SharedCache.addPartitionToCache("db1", "tbl1", part2); + SharedCache.addPartitionToCache("db1", "tbl1", part3); + SharedCache.addPartitionToCache("db1", "tbl2", part1); + + Assert.assertEquals(SharedCache.getCachedPartitionCount(), 4); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + + Partition t = SharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701")); + Assert.assertEquals(t.getSd().getLocation(), "loc1"); + + SharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701")); + Assert.assertEquals(SharedCache.getCachedPartitionCount(), 3); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + + SharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1); + Assert.assertEquals(SharedCache.getCachedPartitionCount(), 3); + Assert.assertEquals(SharedCache.getSdCache().size(), 3); + + SharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702")); + Assert.assertEquals(SharedCache.getCachedPartitionCount(), 2); + Assert.assertEquals(SharedCache.getSdCache().size(), 2); + } +}