diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8e5a9aa..ece8b44 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -896,9 +896,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default property values for newly created tables"), DDL_CTL_PARAMETERS_WHITELIST("hive.ddl.createtablelike.properties.whitelist", "", "Table Properties to copy over when executing a Create Table Like."), - METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.cache.CachedStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), + METASTORE_CACHED_RAW_STORE_IMPL("hive.metastore.cached.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + "Name of the wrapped RawStore class"), METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + diff --git a/conf/hive-site.xml b/conf/hive-site.xml index dab494e..5df5a4f 100644 --- a/conf/hive-site.xml +++ b/conf/hive-site.xml @@ -19,4 +19,101 @@ + + hive.metastore.execute.setugi + true + In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using the client's reported user and group permissions. Note that this property must be set on both the client and server sides. Further note that its best effort. If client sets its to true and server sets it to false, client setting will be ignored. + + + + hive.metastore.local + false + controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://localhost/hivemetastoredb?createDatabaseIfNotExist=true + JDBC connect string for a JDBC metastore + + + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + Driver class name for a JDBC metastore + + + + javax.jdo.option.ConnectionUserName + hive + username to use against metastore database + + + + javax.jdo.option.ConnectionPassword + hive + password to use against metastore database + + + + hive.metastore.warehouse.dir + /user/hive/warehouse + location of default database for the warehouse + + + + hive.metastore.sasl.enabled + false + If true, the metastore thrift interface will be secured with SASL. Clients must authenticate with Kerberos. + + + + hive.metastore.kerberos.keytab.file + KEYTAB_PATH + The path to the Kerberos Keytab file containing the metastore thrift server's service principal. + + + + hive.metastore.kerberos.principal + KERBEROS_PRINCIPAL + The service principal for the metastore thrift server. The special string _HOST will be replaced automatically with the correct host name. + + + + hive.metastore.cache.pinobjtypes + Table,Database,Type,FieldSchema,Order + List of comma separated metastore object types that should be pinned in the cache + + + + + + hadoop.clientside.fs.operations + true + FS operations are owned by client + + + + hive.metastore.client.socket.timeout + 60 + MetaStore Client socket timeout in seconds + + + + hive.metastore.schema.verification + false + + + + hive.metastore.rawstore.impl + org.apache.hadoop.hive.metastore.cache.CachedStore + + + hive.metastore.cached.rawstore.impl + org.apache.hadoop.hive.metastore.ObjectStore + diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 5282a5a..8b0455e 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -911,4 +912,11 @@ public void addPrimaryKeys(List pks) public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 22c1a33..d2196d1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; @@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * This class contains the optimizations for MetaStore that rely on direct SQL access to @@ -1338,6 +1340,63 @@ private long partsFoundForPartitions(final String dbName, final String tableName }); } + // Get aggregated column stats for a table per partition for all columns in the partition + // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) + public Map getAggrColStatsForTablePartitions(String dbName, + String tblName, boolean useDensityFunctionForNDVEstimation) throws MetaException { + String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " + + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " + // The following data is used to compute a partitioned table's NDV based + // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be + // accurately derived from partition NDVs, because the domain of column value two partitions + // can overlap. If there is no overlap then global NDV is just the sum + // of partition NDVs (UpperBound). But if there is some overlay then + // global NDV can be anywhere between sum of partition NDVs (no overlap) + // and same as one of the partition NDV (domain of column value in all other + // partitions is subset of the domain value in one of the partition) + // (LowerBound).But under uniform distribution, we can roughly estimate the global + // NDV by leveraging the min/max values. + // And, we also guarantee that the estimation makes sense by comparing it to the + // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") + // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + ForwardQueryResult fqr = null; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tblName, new ArrayList(), new ArrayList()), queryText); + if (qResult == null) { + query.closeAll(); + return Maps.newHashMap(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + Map partColStatsMap = new HashMap(); + for (Object[] row : list) { + String partName = (String) row[0]; + String colName = (String) row[1]; + partColStatsMap.put( + CachedStore.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName), + prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation)); + Deadline.checkTimeout(); + } + query.closeAll(); + return partColStatsMap; + } + /** Should be called with the list short enough to not trip up Oracle/etc. */ private List columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List partNames, List colNames, boolean areAllPartsFound, diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 8e79e4f..e8a1778 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7370,6 +7370,36 @@ protected String describeResult() { } @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + return new GetHelper>(dbName, tableName, true, false) { + @Override + protected Map getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getAggrColStatsForTablePartitions(dbName, tblName, + useDensityFunctionForNDVEstimation); + } + + @Override + protected Map getJdoResult( + GetHelper> ctx) throws MetaException, + NoSuchObjectException { + // This is fast path for query optimizations, if we can find this info + // quickly using directSql, do it. No point in failing back to slow path + // here. + throw new MetaException("Jdo path is not implemented for stats aggr."); + } + + @Override + protected String describeResult() { + return null; + } + }.run(true); + } + + @Override public void flushCache() { // NOP as there's no caching } @@ -8782,5 +8812,4 @@ public void dropConstraint(String dbName, String tableName, } } } - } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 6f4f031..e417e3f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -577,6 +578,9 @@ public void dropFunction(String dbName, String funcName) public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException; + /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java new file mode 100644 index 0000000..3d3abb5 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.Arrays; + +public class ByteArrayWrapper { + byte[] wrapped; + + ByteArrayWrapper(byte[] b) { + wrapped = b; + } + + @Override + public boolean equals(Object other) { + if (other instanceof ByteArrayWrapper) { + return Arrays.equals(((ByteArrayWrapper)other).wrapped, wrapped); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(wrapped); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java new file mode 100644 index 0000000..65797e2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; + +public class CacheUtils { + private static final String delimit = "\t"; + + public static String buildKey(String dbName, String tableName) { + return dbName + delimit + tableName; + } + + public static String buildKey(String dbName, String tableName, List vals) { + String key = buildKey(dbName, tableName); + if (vals == null || vals.size() == 0) { + return key; + } + for (int i=0;iexpr +// TODO functionCache +// TODO constraintCache +// TODO case sensitivity +// TODO pattern match +// TODO need sd nested copy? +// TODO String intern +// TODO NPE check +// TODO restructure HBaseStore +// TODO monitor event queue +// TODO initial load slow? +// TODO size estimation +// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation +// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object +// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects) + +public class CachedStore implements RawStore, Configurable { + RawStore rawStore; + Configuration conf; + MessageDigest md; + private PartitionExpressionProxy expressionProxy = null; + static boolean firstTime = true; + + static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); + + static class TableWrapper { + Table t; + String location; + Map parameters; + byte[] sdHash; + TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { + this.t = t; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + public Table getTable() { + return t; + } + public byte[] getSdHash() { + return sdHash; + } + public String getLocation() { + return location; + } + public Map getParameters() { + return parameters; + } + } + + static class PartitionWrapper { + Partition p; + String location; + Map parameters; + byte[] sdHash; + PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { + this.p = p; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + public Partition getPartition() { + return p; + } + public byte[] getSdHash() { + return sdHash; + } + public String getLocation() { + return location; + } + public Map getParameters() { + return parameters; + } + } + + static class StorageDescriptorWrapper { + StorageDescriptor sd; + int refCount = 0; + StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { + this.sd = sd; + this.refCount = refCount; + } + public StorageDescriptor getSd() { + return sd; + } + public int getRefCount() { + return refCount; + } + } + + public CachedStore() { + try { + md = MessageDigest.getInstance("MD5"); + } catch (Exception e) { + throw new RuntimeException("should not happen", e); + } + } + + @Override + public void setConf(Configuration conf) { + String rawStoreClassName = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL, + ObjectStore.class.getName()); + try { + rawStore = ((Class) MetaStoreUtils.getClass( + rawStoreClassName)).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); + } + rawStore.setConf(conf); + Configuration oldConf = this.conf; + this.conf = conf; + if (expressionProxy != null && conf != oldConf) { + LOG.warn("Unexpected setConf when we were already configured"); + } + if (expressionProxy == null || conf != oldConf) { + expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); + } + if (firstTime) { + try { + LOG.info("Prewarming CachedStore"); + prewarm(); + LOG.info("CachedStore initialized"); + } catch (Exception e) { + throw new RuntimeException(e); + } + firstTime = false; + } + } + + private void prewarm() throws Exception { + List dbNames = rawStore.getAllDatabases(); + for (String dbName : dbNames) { + Database db = rawStore.getDatabase(dbName); + SharedCache.addDatabaseToCache(db); + List tableNames = rawStore.getAllTables(dbName); + for (String tableName : tableNames) { + Table table = rawStore.getTable(dbName, tableName); + SharedCache.addTableToCache(table); + List partitions = rawStore.getPartitions(dbName, tableName, Integer.MAX_VALUE); + for (Partition partition : partitions) { + SharedCache.addPartitionToCache(partition); + } + } + } + } + + private static String buildKey(String dbName, String tableName) { + return dbName + "\t" + tableName; + } + + private static String buildKey(String dbName, String tableName, List vals) { + String key = buildKey(dbName, tableName); + if (vals == null || vals.size() == 0) { + return key; + } + for (int i=0;i vals, String colName) { + String key = buildKey(dbName, tableName, vals); + return key + "\t" + colName; + } + + @Override + public Configuration getConf() { + return rawStore.getConf(); + } + + @Override + public void shutdown() { + rawStore.shutdown(); + } + + @Override + public boolean openTransaction() { + return rawStore.openTransaction(); + } + + @Override + public boolean commitTransaction() { + return rawStore.commitTransaction(); + } + + @Override + public void rollbackTransaction() { + rawStore.rollbackTransaction(); + } + + @Override + public void createDatabase(Database db) + throws InvalidObjectException, MetaException { + rawStore.createDatabase(db); + SharedCache.addDatabaseToCache(db.deepCopy()); + } + + @Override + public Database getDatabase(String name) throws NoSuchObjectException { + return SharedCache.getDatabaseFromCache(name); + } + + @Override + public boolean dropDatabase(String dbname) + throws NoSuchObjectException, MetaException { + boolean succ = rawStore.dropDatabase(dbname); + if (succ) { + SharedCache.removeDatabaseFromCache(dbname); + } + return succ; + } + + @Override + public boolean alterDatabase(String dbname, Database db) + throws NoSuchObjectException, MetaException { + boolean succ = rawStore.alterDatabase(dbname, db); + if (succ) { + SharedCache.addDatabaseToCache(db.deepCopy()); + } + return succ; + } + + @Override + public List getDatabases(String pattern) throws MetaException { + List results = new ArrayList(); + for (String dbName : SharedCache.listCachedDatabases()) { + if (CacheUtils.matches(dbName, pattern)) { + results.add(dbName); + } + } + return results; + } + + @Override + public List getAllDatabases() throws MetaException { + return SharedCache.listCachedDatabases(); + } + + @Override + public boolean createType(Type type) { + return rawStore.createType(type); + } + + @Override + public Type getType(String typeName) { + return rawStore.getType(typeName); + } + + @Override + public boolean dropType(String typeName) { + return rawStore.dropType(typeName); + } + + @Override + public void createTable(Table tbl) + throws InvalidObjectException, MetaException { + rawStore.createTable(tbl); + SharedCache.addTableToCache(tbl); + } + + @Override + public boolean dropTable(String dbName, String tableName) + throws MetaException, NoSuchObjectException, InvalidObjectException, + InvalidInputException { + boolean succ = rawStore.dropTable(dbName, tableName); + if (succ) { + SharedCache.removeTableFromCache(dbName, tableName); + } + return succ; + } + + @Override + public Table getTable(String dbName, String tableName) throws MetaException { + return SharedCache.getTableFromCache(dbName, tableName); + } + + @Override + public boolean addPartition(Partition part) + throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartition(part); + if (succ) { + SharedCache.addPartitionToCache(part); + } + return succ; + } + + @Override + public boolean addPartitions(String dbName, String tblName, + List parts) throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(dbName, tblName, parts); + if (succ) { + for (Partition part : parts) { + SharedCache.addPartitionToCache(part); + } + } + return succ; + } + + @Override + public boolean addPartitions(String dbName, String tblName, + PartitionSpecProxy partitionSpec, boolean ifNotExists) + throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); + if (succ) { + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + SharedCache.addPartitionToCache(part); + } + } + return succ; + } + + @Override + public Partition getPartition(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException { + return SharedCache.getPartitionFromCache(dbName, tableName, part_vals); + } + + @Override + public boolean doesPartitionExist(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException { + return SharedCache.existPartitionFromCache(dbName, tableName, part_vals); + } + + @Override + public boolean dropPartition(String dbName, String tableName, + List part_vals) throws MetaException, NoSuchObjectException, + InvalidObjectException, InvalidInputException { + boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + if (succ) { + SharedCache.removePatitionFromCache(dbName, tableName, part_vals); + } + return succ; + } + + @Override + public List getPartitions(String dbName, String tableName, int max) + throws MetaException, NoSuchObjectException { + return SharedCache.listCachedPartitions(dbName, tableName, max); + } + + @Override + public void alterTable(String dbName, String tblName, Table newTable) + throws InvalidObjectException, MetaException { + rawStore.alterTable(dbName, tblName, newTable); + SharedCache.alterTableInCache(dbName, tblName, newTable); + } + + @Override + public List getTables(String dbName, String pattern) + throws MetaException { + List tableNames = new ArrayList(); + for (Table table : SharedCache.listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), pattern)) { + tableNames.add(table.getTableName()); + } + } + return tableNames; + } + + @Override + public List getTables(String dbName, String pattern, + TableType tableType) throws MetaException { + List tableNames = new ArrayList(); + for (Table table : SharedCache.listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), pattern) && + table.getTableType().equals(tableType.toString())) { + tableNames.add(table.getTableName()); + } + } + return tableNames; + } + + @Override + public List getTableMeta(String dbNames, String tableNames, + List tableTypes) throws MetaException { + return SharedCache.getTableMeta(dbNames, tableNames, tableTypes); + } + + @Override + public List getTableObjectsByName(String dbName, + List tableNames) throws MetaException, UnknownDBException { + List
tables = new ArrayList
(); + for (String tableName : tableNames) { + tables.add(SharedCache.getTableFromCache(dbName, tableName)); + } + return tables; + } + + @Override + public List getAllTables(String dbName) throws MetaException { + List tableNames = new ArrayList(); + for (Table table : SharedCache.listCachedTables(dbName)) { + tableNames.add(table.getTableName()); + } + return tableNames; + } + + @Override + public List listTableNamesByFilter(String dbName, String filter, + short max_tables) throws MetaException, UnknownDBException { + List tableNames = new ArrayList(); + int count = 0; + for (Table table : SharedCache.listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), filter) + && (max_tables == -1 || count < max_tables)) { + tableNames.add(table.getTableName()); + count++; + } + } + return tableNames; + } + + @Override + public List listPartitionNames(String dbName, String tblName, + short max_parts) throws MetaException { + List partitionNames = new ArrayList(); + Table t = SharedCache.getTableFromCache(dbName, tblName); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(dbName, tblName, max_parts)) { + if (part.getTableName().equals(tblName) + && (max_parts == -1 || count < max_parts)) { + partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); + } + } + return partitionNames; + } + + @Override + public List listPartitionNamesByFilter(String db_name, + String tbl_name, String filter, short max_parts) throws MetaException { + // TODO Translate filter -> expr + return null; + } + + @Override + public void alterPartition(String dbName, String tblName, + List partVals, Partition newPart) + throws InvalidObjectException, MetaException { + rawStore.alterPartition(dbName, tblName, partVals, newPart); + SharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); + } + + @Override + public void alterPartitions(String dbName, String tblName, + List> partValsList, List newParts) + throws InvalidObjectException, MetaException { + rawStore.alterPartitions(dbName, tblName, partValsList, newParts); + for (int i=0;i partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + SharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); + } + } + + @Override + public boolean addIndex(Index index) + throws InvalidObjectException, MetaException { + return rawStore.addIndex(index); + } + + @Override + public Index getIndex(String dbName, String origTableName, String indexName) + throws MetaException { + return rawStore.getIndex(dbName, origTableName, indexName); + } + + @Override + public boolean dropIndex(String dbName, String origTableName, + String indexName) throws MetaException { + return rawStore.dropIndex(dbName, origTableName, indexName); + } + + @Override + public List getIndexes(String dbName, String origTableName, int max) + throws MetaException { + return rawStore.getIndexes(dbName, origTableName, max); + } + + @Override + public List listIndexNames(String dbName, String origTableName, + short max) throws MetaException { + return rawStore.listIndexNames(dbName, origTableName, max); + } + + @Override + public void alterIndex(String dbname, String baseTblName, String name, + Index newIndex) throws InvalidObjectException, MetaException { + rawStore.alterIndex(dbname, baseTblName, name, newIndex); + } + + private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, + String defaultPartName, short maxParts, List result) throws MetaException, NoSuchObjectException { + List parts = SharedCache.listCachedPartitions( + table.getDbName(), table.getTableName(), maxParts); + for (Partition part : parts) { + result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); + } + List columnNames = new ArrayList(); + List typeInfos = new ArrayList(); + for (FieldSchema fs : table.getPartitionKeys()) { + columnNames.add(fs.getName()); + typeInfos.add(TypeInfoFactory.getPrimitiveTypeInfo(fs.getType())); + } + if (defaultPartName == null || defaultPartName.isEmpty()) { + defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); + } + return expressionProxy.filterPartitionsByExpr( + columnNames, typeInfos, expr, defaultPartName, result); + } + + @Override + public List getPartitionsByFilter(String dbName, String tblName, + String filter, short maxParts) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, + String defaultPartitionName, short maxParts, List result) + throws TException { + List partNames = new LinkedList(); + Table table = SharedCache.getTableFromCache(dbName, tblName); + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( + table, expr, defaultPartitionName, maxParts, partNames); + for (String partName : partNames) { + Partition part = SharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); + result.add(part); + } + return hasUnknownPartitions; + } + + @Override + public int getNumPartitionsByFilter(String dbName, String tblName, + String filter) throws MetaException, NoSuchObjectException { + Table table = SharedCache.getTableFromCache(dbName, tblName); + // TODO filter -> expr + return 0; + } + + @Override + public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) + throws MetaException, NoSuchObjectException { + String defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME); + List partNames = new LinkedList(); + Table table = SharedCache.getTableFromCache(dbName, tblName); + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames); + return partNames.size(); + } + + public static List partNameToVals(String name) { + if (name == null) return null; + List vals = new ArrayList(); + String[] kvp = name.split("/"); + for (String kv : kvp) { + vals.add(FileUtils.unescapePathName(kv.substring(kv.indexOf('=') + 1))); + } + return vals; + } + + @Override + public List getPartitionsByNames(String dbName, String tblName, + List partNames) throws MetaException, NoSuchObjectException { + List partitions = new ArrayList(); + for (String partName : partNames) { + Partition part = SharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); + if (part!=null) { + partitions.add(part); + } + } + return partitions; + } + + @Override + public Table markPartitionForEvent(String dbName, String tblName, + Map partVals, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.markPartitionForEvent(dbName, tblName, partVals, evtType); + } + + @Override + public boolean isPartitionMarkedForEvent(String dbName, String tblName, + Map partName, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.isPartitionMarkedForEvent(dbName, tblName, partName, evtType); + } + + @Override + public boolean addRole(String rowName, String ownerName) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.addRole(rowName, ownerName); + } + + @Override + public boolean removeRole(String roleName) + throws MetaException, NoSuchObjectException { + return rawStore.removeRole(roleName); + } + + @Override + public boolean grantRole(Role role, String userName, + PrincipalType principalType, String grantor, PrincipalType grantorType, + boolean grantOption) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption); + } + + @Override + public boolean revokeRole(Role role, String userName, + PrincipalType principalType, boolean grantOption) + throws MetaException, NoSuchObjectException { + return rawStore.revokeRole(role, userName, principalType, grantOption); + } + + @Override + public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getUserPrivilegeSet(userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getDBPrivilegeSet(dbName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName, + String tableName, String userName, List groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getTablePrivilegeSet(dbName, tableName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getPartitionPrivilegeSet(String dbName, + String tableName, String partition, String userName, + List groupNames) throws InvalidObjectException, MetaException { + return rawStore.getPartitionPrivilegeSet(dbName, tableName, partition, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, + String tableName, String partitionName, String columnName, + String userName, List groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getColumnPrivilegeSet(dbName, tableName, partitionName, columnName, userName, groupNames); + } + + @Override + public List listPrincipalGlobalGrants( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalGlobalGrants(principalName, principalType); + } + + @Override + public List listPrincipalDBGrants(String principalName, + PrincipalType principalType, String dbName) { + return rawStore.listPrincipalDBGrants(principalName, principalType, dbName); + } + + @Override + public List listAllTableGrants(String principalName, + PrincipalType principalType, String dbName, String tableName) { + return rawStore.listAllTableGrants(principalName, principalType, dbName, tableName); + } + + @Override + public List listPrincipalPartitionGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, List partValues, String partName) { + return rawStore.listPrincipalPartitionGrants(principalName, principalType, dbName, tableName, partValues, partName); + } + + @Override + public List listPrincipalTableColumnGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, String columnName) { + return rawStore.listPrincipalTableColumnGrants(principalName, principalType, dbName, tableName, columnName); + } + + @Override + public List listPrincipalPartitionColumnGrants( + String principalName, PrincipalType principalType, String dbName, + String tableName, List partValues, String partName, + String columnName) { + return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, dbName, tableName, partValues, partName, columnName); + } + + @Override + public boolean grantPrivileges(PrivilegeBag privileges) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.grantPrivileges(privileges); + } + + @Override + public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.revokePrivileges(privileges, grantOption); + } + + @Override + public Role getRole(String roleName) throws NoSuchObjectException { + return rawStore.getRole(roleName); + } + + @Override + public List listRoleNames() { + return rawStore.listRoleNames(); + } + + @Override + public List listRoles(String principalName, + PrincipalType principalType) { + return rawStore.listRoles(principalName, principalType); + } + + @Override + public List listRolesWithGrants(String principalName, + PrincipalType principalType) { + return rawStore.listRolesWithGrants(principalName, principalType); + } + + @Override + public List listRoleMembers(String roleName) { + return rawStore.listRoleMembers(roleName); + } + + @Override + public Partition getPartitionWithAuth(String dbName, String tblName, + List partVals, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + Partition p = SharedCache.getPartitionFromCache(dbName, tblName, partVals); + Table t = SharedCache.getTableFromCache(dbName, tblName); + String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, + userName, groupNames); + p.setPrivileges(privs); + return p; + } + + @Override + public List getPartitionsWithAuth(String dbName, String tblName, + short maxParts, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + Table t = SharedCache.getTableFromCache(dbName, tblName); + List partitions = new ArrayList(); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(dbName, tblName, maxParts)) { + if (maxParts == -1 || count < maxParts) { + String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, + userName, groupNames); + part.setPrivileges(privs); + partitions.add(part); + count++; + } + } + return partitions; + } + + @Override + public List listPartitionNamesPs(String dbName, String tblName, + List partVals, short maxParts) + throws MetaException, NoSuchObjectException { + List partNames = new ArrayList(); + int count = 0; + Table t = SharedCache.getTableFromCache(dbName, tblName); + for (Partition part : SharedCache.listCachedPartitions(dbName, tblName, maxParts)) { + boolean psMatch = true; + for (int i=0;i listPartitionsPsWithAuth(String dbName, + String tblName, List partVals, short maxParts, String userName, + List groupNames) + throws MetaException, InvalidObjectException, NoSuchObjectException { + List partitions = new ArrayList(); + Table t = SharedCache.getTableFromCache(dbName, tblName); + int count = 0; + for (Partition part : SharedCache.listCachedPartitions(dbName, tblName, maxParts)) { + boolean psMatch = true; + for (int i=0;i partVals) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + return rawStore.updatePartitionColumnStatistics(statsObj, partVals); + } + + @Override + public ColumnStatistics getTableColumnStatistics(String dbName, + String tableName, List colName) + throws MetaException, NoSuchObjectException { + return rawStore.getTableColumnStatistics(dbName, tableName, colName); + } + + @Override + public List getPartitionColumnStatistics(String dbName, + String tblName, List partNames, List colNames) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); + } + + @Override + public boolean deletePartitionColumnStatistics(String dbName, + String tableName, String partName, List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + } + + @Override + public boolean deleteTableColumnStatistics(String dbName, String tableName, + String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + return rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + } + + @Override + public long cleanupEvents() { + return rawStore.cleanupEvents(); + } + + @Override + public boolean addToken(String tokenIdentifier, String delegationToken) { + return rawStore.addToken(tokenIdentifier, delegationToken); + } + + @Override + public boolean removeToken(String tokenIdentifier) { + return rawStore.removeToken(tokenIdentifier); + } + + @Override + public String getToken(String tokenIdentifier) { + return rawStore.getToken(tokenIdentifier); + } + + @Override + public List getAllTokenIdentifiers() { + return rawStore.getAllTokenIdentifiers(); + } + + @Override + public int addMasterKey(String key) throws MetaException { + return rawStore.addMasterKey(key); + } + + @Override + public void updateMasterKey(Integer seqNo, String key) + throws NoSuchObjectException, MetaException { + rawStore.updateMasterKey(seqNo, key); + } + + @Override + public boolean removeMasterKey(Integer keySeq) { + return rawStore.removeMasterKey(keySeq); + } + + @Override + public String[] getMasterKeys() { + return rawStore.getMasterKeys(); + } + + @Override + public void verifySchema() throws MetaException { + rawStore.verifySchema(); + } + + @Override + public String getMetaStoreSchemaVersion() throws MetaException { + return rawStore.getMetaStoreSchemaVersion(); + } + + @Override + public void setMetaStoreSchemaVersion(String version, String comment) + throws MetaException { + rawStore.setMetaStoreSchemaVersion(version, comment); + } + + @Override + public void dropPartitions(String dbName, String tblName, + List partNames) throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(dbName, tblName, partNames); + for (String partName : partNames) { + List vals = partNameToVals(partName); + Partition part = SharedCache.removePatitionFromCache(dbName, tblName, vals); + } + } + + @Override + public List listPrincipalDBGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalDBGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableGrantsAll( + String principalName, PrincipalType principalType) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List listPrincipalPartitionGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalPartitionGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableColumnGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalTableColumnGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalPartitionColumnGrantsAll( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType); + } + + @Override + public List listGlobalGrantsAll() { + return rawStore.listGlobalGrantsAll(); + } + + @Override + public List listDBGrantsAll(String dbName) { + return rawStore.listDBGrantsAll(dbName); + } + + @Override + public List listPartitionColumnGrantsAll(String dbName, + String tableName, String partitionName, String columnName) { + return rawStore.listPartitionColumnGrantsAll(dbName, tableName, partitionName, columnName); + } + + @Override + public List listTableGrantsAll(String dbName, + String tableName) { + return rawStore.listTableGrantsAll(dbName, tableName); + } + + @Override + public List listPartitionGrantsAll(String dbName, + String tableName, String partitionName) { + return rawStore.listPartitionGrantsAll(dbName, tableName, partitionName); + } + + @Override + public List listTableColumnGrantsAll(String dbName, + String tableName, String columnName) { + return rawStore.listTableColumnGrantsAll(dbName, tableName, columnName); + } + + @Override + public void createFunction(Function func) + throws InvalidObjectException, MetaException { + // TODO fucntionCache + rawStore.createFunction(func); + } + + @Override + public void alterFunction(String dbName, String funcName, + Function newFunction) throws InvalidObjectException, MetaException { + // TODO fucntionCache + rawStore.alterFunction(dbName, funcName, newFunction); + } + + @Override + public void dropFunction(String dbName, String funcName) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { + // TODO fucntionCache + rawStore.dropFunction(dbName, funcName); + } + + @Override + public Function getFunction(String dbName, String funcName) + throws MetaException { + // TODO fucntionCache + return rawStore.getFunction(dbName, funcName); + } + + @Override + public List getAllFunctions() throws MetaException { + // TODO fucntionCache + return rawStore.getAllFunctions(); + } + + @Override + public List getFunctions(String dbName, String pattern) + throws MetaException { + // TODO fucntionCache + return rawStore.getFunctions(dbName, pattern); + } + + @Override + public AggrStats get_aggr_stats_for(String dbName, String tblName, + List partNames, List colNames) + throws MetaException, NoSuchObjectException { + List colStats = new ArrayList(colNames.size()); + for (String colName : colNames) { + colStats.add(mergeColStatsForPartitions(dbName, tblName, partNames, colName)); + } + // TODO: revisit the partitions not found case for extrapolation + return new AggrStats(colStats, partNames.size()); + } + + private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName, + List partNames, String colName) throws MetaException { + ColumnStatisticsObj colStats = null; + for (String partName : partNames) { + String colStatsCacheKey = buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats( + colStatsCacheKey); + if (colStats == null) { + colStats = colStatsForPart; + } else { + colStats = mergeColStatsObj(colStats, colStatsForPart); + } + } + return colStats; + } + + private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1, + ColumnStatisticsObj colStats2) throws MetaException { + if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType())) + && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) { + throw new MetaException("Can't merge column stats for two partitions for different columns."); + } + ColumnStatisticsData csd = new ColumnStatisticsData(); + ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(), + colStats1.getColType(), csd); + ColumnStatisticsData csData1 = colStats1.getStatsData(); + ColumnStatisticsData csData2 = colStats2.getStatsData(); + String colType = colStats1.getColType().toLowerCase(); + if (colType.equals("boolean")) { + BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); + boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses() + + csData2.getBooleanStats().getNumFalses()); + boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues() + + csData2.getBooleanStats().getNumTrues()); + boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls() + + csData2.getBooleanStats().getNumNulls()); + csd.setBooleanStats(boolStats); + } else if (colType.equals("string") || colType.startsWith("varchar") + || colType.startsWith("char")) { + StringColumnStatsData stringStats = new StringColumnStatsData(); + stringStats.setNumNulls(csData1.getStringStats().getNumNulls() + + csData2.getStringStats().getNumNulls()); + stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2 + .getStringStats().getAvgColLen())); + stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2 + .getStringStats().getMaxColLen())); + stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats() + .getNumDVs())); + csd.setStringStats(stringStats); + } else if (colType.equals("binary")) { + BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); + binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls() + + csData2.getBinaryStats().getNumNulls()); + binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2 + .getBinaryStats().getAvgColLen())); + binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2 + .getBinaryStats().getMaxColLen())); + csd.setBinaryStats(binaryStats); + } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint") + || colType.equals("tinyint") || colType.equals("timestamp")) { + LongColumnStatsData longStats = new LongColumnStatsData(); + longStats.setNumNulls(csData1.getLongStats().getNumNulls() + + csData2.getLongStats().getNumNulls()); + longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats() + .getHighValue())); + longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats() + .getLowValue())); + longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats() + .getNumDVs())); + csd.setLongStats(longStats); + } else if (colType.equals("date")) { + DateColumnStatsData dateStats = new DateColumnStatsData(); + dateStats.setNumNulls(csData1.getDateStats().getNumNulls() + + csData2.getDateStats().getNumNulls()); + dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue() + .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch()))); + dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue() + .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch()))); + dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats() + .getNumDVs())); + csd.setDateStats(dateStats); + } else if (colType.equals("double") || colType.equals("float")) { + DoubleColumnStatsData doubleStats = new DoubleColumnStatsData(); + doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls() + + csData2.getDoubleStats().getNumNulls()); + doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2 + .getDoubleStats().getHighValue())); + doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2 + .getDoubleStats().getLowValue())); + doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats() + .getNumDVs())); + csd.setDoubleStats(doubleStats); + } else if (colType.startsWith("decimal")) { + DecimalColumnStatsData decimalStats = new DecimalColumnStatsData(); + decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls() + + csData2.getDecimalStats().getNumNulls()); + Decimal high = (csData1.getDecimalStats().getHighValue() + .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats() + .getHighValue() : csData2.getDecimalStats().getHighValue(); + decimalStats.setHighValue(high); + Decimal low = (csData1.getDecimalStats().getLowValue() + .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats() + .getLowValue() : csData2.getDecimalStats().getLowValue(); + decimalStats.setLowValue(low); + decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2 + .getDecimalStats().getNumDVs())); + csd.setDecimalStats(decimalStats); + } + return cso; + } + + @Override + public NotificationEventResponse getNextNotification( + NotificationEventRequest rqst) { + return rawStore.getNextNotification(rqst); + } + + @Override + public void addNotificationEvent(NotificationEvent event) { + rawStore.addNotificationEvent(event); + } + + @Override + public void cleanNotificationEvents(int olderThan) { + rawStore.cleanNotificationEvents(olderThan); + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + return rawStore.getCurrentNotificationEventId(); + } + + @Override + public void flushCache() { + rawStore.flushCache(); + } + + @Override + public ByteBuffer[] getFileMetadata(List fileIds) throws MetaException { + return rawStore.getFileMetadata(fileIds); + } + + @Override + public void putFileMetadata(List fileIds, List metadata, + FileMetadataExprType type) throws MetaException { + rawStore.putFileMetadata(fileIds, metadata, type); + } + + @Override + public boolean isFileMetadataSupported() { + return rawStore.isFileMetadataSupported(); + } + + @Override + public void getFileMetadataByExpr(List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, + ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException { + rawStore.getFileMetadataByExpr(fileIds, type, expr, metadatas, exprResults, eliminated); + } + + @Override + public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { + return rawStore.getFileMetadataHandler(type); + } + + @Override + public int getTableCount() throws MetaException { + return SharedCache.getCachedTableCount(); + } + + @Override + public int getPartitionCount() throws MetaException { + return SharedCache.getCachedPartitionCount(); + } + + @Override + public int getDatabaseCount() throws MetaException { + return SharedCache.getCachedDatabaseCount(); + } + + @Override + public List getPrimaryKeys(String db_name, String tbl_name) + throws MetaException { + // TODO constraintCache + return rawStore.getPrimaryKeys(db_name, tbl_name); + } + + @Override + public List getForeignKeys(String parent_db_name, + String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) + throws MetaException { + // TODO constraintCache + return rawStore.getForeignKeys(parent_db_name, parent_tbl_name, foreign_db_name, foreign_tbl_name); + } + + @Override + public void createTableWithConstraints(Table tbl, + List primaryKeys, List foreignKeys) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys); + SharedCache.addTableToCache(tbl); + } + + @Override + public void dropConstraint(String dbName, String tableName, + String constraintName) throws NoSuchObjectException { + // TODO constraintCache + rawStore.dropConstraint(dbName, tableName, constraintName); + } + + @Override + public void addPrimaryKeys(List pks) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.addPrimaryKeys(pks); + } + + @Override + public void addForeignKeys(List fks) + throws InvalidObjectException, MetaException { + // TODO constraintCache + rawStore.addForeignKeys(fks); + } + + @Override + public Map getAggrColStatsForTablePartitions( + String dbName, String tableName) + throws MetaException, NoSuchObjectException { + return rawStore.getAggrColStatsForTablePartitions(dbName, tableName); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java new file mode 100644 index 0000000..194f1f9 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; + +public class SharedCache { + private static Map databaseCache = new HashMap(); + private static Map tableCache = new HashMap(); + private static Map partitionCache = new HashMap(); + private static Map partitionColStatsCache = new HashMap(); + private static Map sdCache = new HashMap(); + private static MessageDigest md; + + static { + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("should not happen", e); + } + } + + public static synchronized Database getDatabaseFromCache(String name) { + return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; + } + + public static synchronized void addDatabaseToCache(Database db) { + databaseCache.put(db.getName(), db); + } + + public static synchronized void removeDatabaseFromCache(String dbName) { + databaseCache.remove(dbName); + } + + public static synchronized List listCachedDatabases() { + return new ArrayList(databaseCache.keySet()); + } + + public static synchronized int getCachedDatabaseCount() { + return databaseCache.size(); + } + + public static synchronized Table getTableFromCache(String dbName, String tableName) { + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper == null) { + return null; + } + Table t = CacheUtils.assemble(tblWrapper); + return t; + } + + public static synchronized void addTableToCache(Table tbl) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md); + StorageDescriptor sd = tbl.getSd(); + increSd(sd, sdHash); + Table tblCopy = tbl.deepCopy(); + tblCopy.setSd(null); + TableWrapper wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); + tableCache.put(CacheUtils.buildKey(tbl.getDbName(), tbl.getTableName()), wrapper); + } + + public static synchronized void removeTableFromCache(String dbName, String tblName) { + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + decrSd(sdHash); + } + + public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { + removeTableFromCache(dbName, tblName); + addTableToCache(newTable); + } + + public static synchronized List
listCachedTables(String dbName) { + List
tables = new ArrayList
(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tables.add(CacheUtils.assemble(wrapper)); + } + } + return tables; + } + + public static synchronized int getCachedTableCount() { + return tableCache.size(); + } + + public static synchronized List getTableMeta(String dbNames, String tableNames, List tableTypes) { + List tableMetas = new ArrayList(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes==null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = new TableMeta( + dbName, table.getTableName(), table.getTableType()); + metaData.setComments(table.getParameters().get("comment")); + tableMetas.add(metaData); + } + } + } + } + } + return tableMetas; + } + + public static synchronized void addPartitionToCache(Partition part) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + increSd(sd, sdHash); + Partition partCopy = part.deepCopy(); + partCopy.setSd(null); + PartitionWrapper wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + partitionCache.put(CacheUtils.buildKey(part.getDbName(), part.getTableName(), part.getValues()), wrapper); + } + + public static synchronized Partition getPartitionFromCache(String key) { + PartitionWrapper wrapper = partitionCache.get(key); + if (wrapper == null) { + return null; + } + Partition p = CacheUtils.assemble(wrapper); + return p; + } + + public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List part_vals) { + return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List part_vals) { + return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized Partition removePatitionFromCache(String dbName, String tblName, List part_vals) { + PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + decrSd(wrapper.getSdHash()); + return wrapper.getPartition(); + } + + public static synchronized List listCachedPartitions(String dbName, String tblName, int max) { + List partitions = new ArrayList(); + int count = 0; + for (PartitionWrapper wrapper : partitionCache.values()) { + if (wrapper.getPartition().getDbName().equals(dbName) + && wrapper.getPartition().getTableName().equals(tblName) + && (max == -1 || count < max)) { + partitions.add(CacheUtils.assemble(wrapper)); + count++; + } + } + return partitions; + } + + public static synchronized void alterPartitionInCache(String dbName, String tblName, List partVals, Partition newPart) { + removePatitionFromCache(dbName, tblName, partVals); + addPartitionToCache(newPart); + } + + public static synchronized int getCachedPartitionCount() { + return partitionCache.size(); + } + + public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { + return partitionColStatsCache.get(key); + } + + public static void increSd(StorageDescriptor sd, byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + if (sdCache.containsKey(byteArray)) { + sdCache.get(byteArray).refCount++; + } else { + StorageDescriptor sdToCache = sd.deepCopy(); + sdToCache.setLocation(null); + sdToCache.setParameters(null); + sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1)); + } + } + + public static void decrSd(byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); + sdWrapper.refCount--; + if (sdWrapper.getRefCount() == 0) { + sdCache.remove(byteArray); + } + } + + public static StorageDescriptor getSdFromCache(byte[] sdHash) { + StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); + return sdWrapper.getSd(); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index f9619e5..f6420f5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2852,4 +2852,11 @@ public void addForeignKeys(List fks) throws InvalidObjectExceptio commitOrRoleBack(commit); } } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO: see if it makes sense to implement this here + return null; + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 94087b1..3172f92 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -619,7 +619,7 @@ private static ResourceType convertResourceTypes( * @param md message descriptor to use to generate the hash * @return the hash as a byte array */ - static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { + public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { // Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different // results for hashes based on the OS or JVM being used. md.reset(); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index f64b08d..3e3fd20 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -869,4 +870,11 @@ public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 2682886..91d8c2a 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -885,6 +886,13 @@ public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub } + + @Override + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } }