diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index fb0b2fe..c983277 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -216,7 +216,9 @@ public String toString() { ConfVars.AGGREGATE_STATS_CACHE_MAX_FULL, ConfVars.AGGREGATE_STATS_CACHE_CLEAN_UNTIL, ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, - ConfVars.FILE_METADATA_THREADS + ConfVars.FILE_METADATA_THREADS, + ConfVars.METASTORE_CLIENT_FILTER_RESULT, + ConfVars.METASTORE_SERVER_FILTER_RESULT }; /** @@ -657,6 +659,10 @@ public static ConfVars getMetaConf(String name) { "metadata being exported to the current user's home directory on HDFS."), METASTORE_MAX_EVENT_RESPONSE("metastore.max.event.response", "hive.metastore.max.event.response", 1000000, "The parameter will decide the maximum number of events that HMS will respond."), + METASTORE_CLIENT_FILTER_RESULT("metastore.client.filter.result", "metastore.client.filter.result", true, + "Filtering the metadata read results at HMS client"), + METASTORE_SERVER_FILTER_RESULT("metastore.server.filter.result", "metastore.server.filter.result", false, + "Filtering the metadata read results at HMS server"), MOVE_EXPORTED_METADATA_TO_TRASH("metastore.metadata.move.exported.metadata.to.trash", "hive.metadata.move.exported.metadata.to.trash", true, "When used in conjunction with the org.apache.hadoop.hive.ql.parse.MetaDataExportListener pre event listener, \n" + diff --git a/standalone-metastore/metastore-server/pom.xml b/standalone-metastore/metastore-server/pom.xml index 895abfc..f67ec48 100644 --- a/standalone-metastore/metastore-server/pom.xml +++ b/standalone-metastore/metastore-server/pom.xml @@ -239,6 +239,11 @@ curator-test test + + org.apache.hive + hive-common + 4.0.0-SNAPSHOT + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 0a1b96d..1287e56 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependNotNullCatToDbName; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -73,11 +75,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ZooKeeperHiveHelper; import org.apache.hadoop.hive.common.ZKDeRegisterWatcher; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -208,6 +212,8 @@ // embedded metastore or a remote one private static boolean isMetaStoreRemote = false; + private static boolean isServerFilterEnabled = false; + // Used for testing to simulate method timeout. @VisibleForTesting static boolean TEST_TIMEOUT_ENABLED = false; @@ -505,6 +511,7 @@ public Configuration getHiveConf() { private List transactionalListeners; private List endFunctionListeners; private List initListeners; + private MetaStoreFilterHook filterHook; private Pattern partitionValidationPattern; private final boolean isInTest; @@ -615,6 +622,26 @@ public void init() throws MetaException { } expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); fileMetadataManager = new FileMetadataManager(this.getMS(), conf); + + filterHook = loadFilterHooks(); + isServerFilterEnabled = MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_SERVER_FILTER_RESULT); + } + + private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { + String filterHookClassName = MetastoreConf.getVar(conf, ConfVars.FILTER_HOOK); + Class authProviderClass = conf. + getClass(filterHookClassName, + DefaultMetaStoreFilterHookImpl.class, + MetaStoreFilterHook.class); + String errorMsg = "Unable to create instance of " + authProviderClass.getName() + ": "; + try { + Constructor constructor = + authProviderClass.getConstructor(Configuration.class); + return constructor.newInstance(conf); + } catch (NoSuchMethodException | IllegalStateException | InstantiationException + | InvocationTargetException | IllegalAccessException e) { + throw new IllegalStateException(errorMsg + e.getMessage(), e); + } } private static String addPrefix(String s) { @@ -1376,6 +1403,10 @@ public Database get_database(final String name) throws NoSuchObjectException, Me try { String[] parsedDbName = parseDbName(name, conf); db = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]); + if (isServerFilterEnabled) { + db = filterHook.filterDatabase(db); + } + firePreEvent(new PreReadDatabaseEvent(db, this)); } catch (MetaException|NoSuchObjectException e) { ex = e; @@ -1676,6 +1707,9 @@ public void drop_database(final String dbName, final boolean deleteData, final b try { if (parsedDbNamed[DB_NAME] == null) { ret = getMS().getAllDatabases(parsedDbNamed[CAT_NAME]); + if (isServerFilterEnabled) { + ret = filterHook.filterDatabases(ret); + } } else { ret = getMS().getDatabases(parsedDbNamed[CAT_NAME], parsedDbNamed[DB_NAME]); } @@ -1693,6 +1727,7 @@ public void drop_database(final String dbName, final boolean deleteData, final b @Override public List get_all_databases() throws MetaException { + // get_databases filters results alreayd. No need to filter here return get_databases(MetaStoreUtils.prependCatalogToDbName(null, null, conf)); } @@ -2899,8 +2934,27 @@ private boolean isExternalTablePurge(Table table) { public Table get_table(final String dbname, final String name) throws MetaException, NoSuchObjectException { String[] parsedDbName = parseDbName(dbname, conf); - return getTableInternal( - parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null); + + Table t = null; + startTableFunction("get_table", parsedDbName[CAT_NAME], dbname, name); + Exception ex = null; + try { + t = getTableInternal( + parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null); + if (isServerFilterEnabled) { + t = filterHook.filterTable(t); + } + firePreEvent(new PreReadTableEvent(t, this)); + } catch (MetaException e) { + ex = e; + throw e; + } catch (NoSuchObjectException e) { + ex = e; + throw e; + } finally { + endFunction("get_table", t != null, ex, name); + } + return t; } @Override @@ -3007,14 +3061,25 @@ public Table get_table_core( public List get_table_objects_by_name(final String dbName, final List tableNames) throws MetaException, InvalidOperationException, UnknownDBException { String[] parsedDbName = parseDbName(dbName, conf); - return getTableObjectsInternal(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableNames, null); + + List
ret = + getTableObjectsInternal(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableNames, null); + if (isServerFilterEnabled) { + filterHook.filterTables(ret); + } + + return ret; } @Override public GetTablesResult get_table_objects_by_name_req(GetTablesRequest req) throws TException { String catName = req.isSetCatName() ? req.getCatName() : getDefaultCatalog(conf); - return new GetTablesResult(getTableObjectsInternal(catName, - req.getDbName(), req.getTblNames(), req.getCapabilities())); + List
ret = getTableObjectsInternal(catName, req.getDbName(), + req.getTblNames(), req.getCapabilities()); + if (isServerFilterEnabled) { + filterHook.filterTables(ret); + } + return new GetTablesResult(ret); } private List
getTableObjectsInternal(String catName, String dbName, @@ -3125,6 +3190,9 @@ private boolean doesClientHaveCapability(ClientCapabilities client, ClientCapabi throw new InvalidOperationException(filter + " cannot apply null filter"); } tables = getMS().listTableNamesByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], filter, maxTables); + if (isServerFilterEnabled) { + tables = filterHook.filterTableNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tables); + } } catch (MetaException | InvalidOperationException | UnknownDBException e) { ex = e; throw e; @@ -4525,8 +4593,14 @@ public Partition get_partition(final String db_name, final String tbl_name, Partition ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); fireReadTablePreEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name); ret = getMS().getPartition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals); + if (isServerFilterEnabled) { + ret = filterHook.filterPartition(ret); + } } catch (Exception e) { ex = e; throwMetaException(e); @@ -4537,6 +4611,32 @@ public Partition get_partition(final String db_name, final String tbl_name, } /** + * This is a helper method to filter out a given database or a table name. This will improve + * performance when filtering partitions. If the db or table is filtered out, we don't need + * to even fetch the partitions. We can throw a NoSuchObjectException. + * @param dbName the database name + * @param tblName the table name contained in the database + * @throws NoSuchObjectException if the database or table is filtered out + */ + private void checkDbAndTableFilters(final String dbName, final String tblName) + throws NoSuchObjectException, MetaException { + String[] parsedDbName = parseDbName(dbName, conf); + List filteredDb = isServerFilterEnabled ? + filterHook.filterDatabases(Collections.singletonList(dbName)): + Collections.singletonList(dbName); + if (filteredDb.isEmpty()) { + throw new NoSuchObjectException("Database " + dbName + " does not exist"); + } + + List filteredTable = isServerFilterEnabled ? + filterHook.filterTableNames(parsedDbName[CAT_NAME], + dbName, Collections.singletonList(tblName)) : Collections.singletonList(tblName); + if (filteredTable.isEmpty()) { + throw new NoSuchObjectException("Table " + tblName + " does not exist"); + } + } + + /** * Fire a pre-event for read table operation, if there are any * pre-event listeners registered */ @@ -4566,8 +4666,14 @@ public Partition get_partition_with_auth(final String db_name, Partition ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); ret = getMS().getPartitionWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals, user_name, group_names); + if (isServerFilterEnabled) { + ret = filterHook.filterPartition(ret); + } } catch (InvalidObjectException e) { ex = e; throw new NoSuchObjectException(e.getMessage()); @@ -4589,10 +4695,16 @@ public Partition get_partition_with_auth(final String db_name, List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, NO_FILTER_STRING, max_parts); ret = getMS().getPartitions(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, max_parts); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (Exception e) { ex = e; throwMetaException(e); @@ -4613,10 +4725,16 @@ public Partition get_partition_with_auth(final String db_name, List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(dbName, tblName); checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, NO_FILTER_STRING, maxParts); ret = getMS().getPartitionsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, maxParts, userName, groupNames); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (InvalidObjectException e) { ex = e; throw new NoSuchObjectException(e.getMessage()); @@ -4752,8 +4870,15 @@ private static boolean is_partition_spec_grouping_enabled(Table table) { List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); ret = getMS().listPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, max_parts); + if (isServerFilterEnabled) { + ret = filterHook + .filterPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret); + } } catch (MetaException e) { ex = e; throw e; @@ -5100,6 +5225,9 @@ private void alter_table_core(String catName, String dbname, String name, Table String[] parsedDbName = parseDbName(dbname, conf); try { ret = getMS().getTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], pattern); + if (isServerFilterEnabled) { + ret = filterHook.filterTableNames(parsedDbName[CAT_NAME], dbname, ret); + } } catch (MetaException e) { ex = e; throw e; @@ -5165,6 +5293,9 @@ private void alter_table_core(String catName, String dbname, String name, Table String[] parsedDbName = parseDbName(dbname, conf); try { ret = getMS().getAllTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]); + if (isServerFilterEnabled) { + ret = filterHook.filterTableNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], ret); + } } catch (MetaException e) { ex = e; throw e; @@ -5540,9 +5671,15 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); // Don't send the parsedDbName, as this method will parse itself. ret = get_partitions_ps_with_auth(db_name, tbl_name, part_vals, max_parts, null, null); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (Exception e) { ex = e; rethrowException(e); @@ -5565,8 +5702,14 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); ret = getMS().listPartitionsPsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals, max_parts, userName, groupNames); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (InvalidObjectException e) { ex = e; throw new MetaException(e.getMessage()); @@ -5590,8 +5733,15 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(db_name, tbl_name); ret = getMS().listPartitionNamesPs(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals, max_parts); + if (isServerFilterEnabled) { + ret = filterHook + .filterPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret); + } } catch (Exception e) { ex = e; rethrowException(e); @@ -6032,10 +6182,16 @@ public boolean delete_table_column_statistics(String dbName, String tableName, S List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(dbName, tblName); checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, filter, maxParts); ret = getMS().getPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, filter, maxParts); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (Exception e) { ex = e; rethrowException(e); @@ -6169,8 +6325,14 @@ private int get_num_partitions_by_expr(final String catName, final String dbName List ret = null; Exception ex = null; try { + // For improved performance, we'll check if the said db and table are to be filtered out. + // If so, then we won't proceed with querying the partitions. + checkDbAndTableFilters(dbName, tblName); ret = getMS().getPartitionsByNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName, partNames); + if (isServerFilterEnabled) { + ret = filterHook.filterPartitions(ret); + } } catch (Exception e) { ex = e; rethrowException(e);