diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index fb0b2fe6fb9fd4b4c92a6a39f06f39a4641aaabd..c9832773306c3f37f17422663d90c4d842df187d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -216,7 +216,9 @@ public String toString() {
ConfVars.AGGREGATE_STATS_CACHE_MAX_FULL,
ConfVars.AGGREGATE_STATS_CACHE_CLEAN_UNTIL,
ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
- ConfVars.FILE_METADATA_THREADS
+ ConfVars.FILE_METADATA_THREADS,
+ ConfVars.METASTORE_CLIENT_FILTER_RESULT,
+ ConfVars.METASTORE_SERVER_FILTER_RESULT
};
/**
@@ -657,6 +659,10 @@ public static ConfVars getMetaConf(String name) {
"metadata being exported to the current user's home directory on HDFS."),
METASTORE_MAX_EVENT_RESPONSE("metastore.max.event.response", "hive.metastore.max.event.response", 1000000,
"The parameter will decide the maximum number of events that HMS will respond."),
+ METASTORE_CLIENT_FILTER_RESULT("metastore.client.filter.result", "metastore.client.filter.result", true,
+ "Filtering the metadata read results at HMS client"),
+ METASTORE_SERVER_FILTER_RESULT("metastore.server.filter.result", "metastore.server.filter.result", false,
+ "Filtering the metadata read results at HMS server"),
MOVE_EXPORTED_METADATA_TO_TRASH("metastore.metadata.move.exported.metadata.to.trash",
"hive.metadata.move.exported.metadata.to.trash", true,
"When used in conjunction with the org.apache.hadoop.hive.ql.parse.MetaDataExportListener pre event listener, \n" +
diff --git a/standalone-metastore/metastore-server/pom.xml b/standalone-metastore/metastore-server/pom.xml
index 895abfc423f00b121ee63e40904f5b3e57aea8ed..f67ec487112299d95ee185232bf3293fb1f0b8de 100644
--- a/standalone-metastore/metastore-server/pom.xml
+++ b/standalone-metastore/metastore-server/pom.xml
@@ -239,6 +239,11 @@
curator-test
test
+
+ org.apache.hive
+ hive-common
+ 4.0.0-SNAPSHOT
+
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 0a1b96dcf62d3536cab2ce074d27a6225b2d3443..1287e560987ad5668c6206dc39d86616e4680eec 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -30,6 +30,8 @@
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependNotNullCatToDbName;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -73,11 +75,13 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -208,6 +212,8 @@
// embedded metastore or a remote one
private static boolean isMetaStoreRemote = false;
+ private static boolean isServerFilterEnabled = false;
+
// Used for testing to simulate method timeout.
@VisibleForTesting
static boolean TEST_TIMEOUT_ENABLED = false;
@@ -505,6 +511,7 @@ public Configuration getHiveConf() {
private List transactionalListeners;
private List endFunctionListeners;
private List initListeners;
+ private MetaStoreFilterHook filterHook;
private Pattern partitionValidationPattern;
private final boolean isInTest;
@@ -615,6 +622,26 @@ public void init() throws MetaException {
}
expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
+
+ filterHook = loadFilterHooks();
+ isServerFilterEnabled = MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_SERVER_FILTER_RESULT);
+ }
+
+ private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException {
+ String filterHookClassName = MetastoreConf.getVar(conf, ConfVars.FILTER_HOOK);
+ Class extends MetaStoreFilterHook> authProviderClass = conf.
+ getClass(filterHookClassName,
+ DefaultMetaStoreFilterHookImpl.class,
+ MetaStoreFilterHook.class);
+ String errorMsg = "Unable to create instance of " + authProviderClass.getName() + ": ";
+ try {
+ Constructor extends MetaStoreFilterHook> constructor =
+ authProviderClass.getConstructor(Configuration.class);
+ return constructor.newInstance(conf);
+ } catch (NoSuchMethodException | IllegalStateException | InstantiationException
+ | InvocationTargetException | IllegalAccessException e) {
+ throw new IllegalStateException(errorMsg + e.getMessage(), e);
+ }
}
private static String addPrefix(String s) {
@@ -1376,6 +1403,10 @@ public Database get_database(final String name) throws NoSuchObjectException, Me
try {
String[] parsedDbName = parseDbName(name, conf);
db = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
+ if (isServerFilterEnabled) {
+ db = filterHook.filterDatabase(db);
+ }
+
firePreEvent(new PreReadDatabaseEvent(db, this));
} catch (MetaException|NoSuchObjectException e) {
ex = e;
@@ -1676,6 +1707,9 @@ public void drop_database(final String dbName, final boolean deleteData, final b
try {
if (parsedDbNamed[DB_NAME] == null) {
ret = getMS().getAllDatabases(parsedDbNamed[CAT_NAME]);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterDatabases(ret);
+ }
} else {
ret = getMS().getDatabases(parsedDbNamed[CAT_NAME], parsedDbNamed[DB_NAME]);
}
@@ -1693,6 +1727,7 @@ public void drop_database(final String dbName, final boolean deleteData, final b
@Override
public List get_all_databases() throws MetaException {
+ // get_databases filters results alreayd. No need to filter here
return get_databases(MetaStoreUtils.prependCatalogToDbName(null, null, conf));
}
@@ -2899,8 +2934,27 @@ private boolean isExternalTablePurge(Table table) {
public Table get_table(final String dbname, final String name) throws MetaException,
NoSuchObjectException {
String[] parsedDbName = parseDbName(dbname, conf);
- return getTableInternal(
- parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null);
+
+ Table t = null;
+ startTableFunction("get_table", parsedDbName[CAT_NAME], dbname, name);
+ Exception ex = null;
+ try {
+ t = getTableInternal(
+ parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null);
+ if (isServerFilterEnabled) {
+ t = filterHook.filterTable(t);
+ }
+ firePreEvent(new PreReadTableEvent(t, this));
+ } catch (MetaException e) {
+ ex = e;
+ throw e;
+ } catch (NoSuchObjectException e) {
+ ex = e;
+ throw e;
+ } finally {
+ endFunction("get_table", t != null, ex, name);
+ }
+ return t;
}
@Override
@@ -3007,14 +3061,25 @@ public Table get_table_core(
public List get_table_objects_by_name(final String dbName, final List tableNames)
throws MetaException, InvalidOperationException, UnknownDBException {
String[] parsedDbName = parseDbName(dbName, conf);
- return getTableObjectsInternal(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableNames, null);
+
+ List ret =
+ getTableObjectsInternal(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableNames, null);
+ if (isServerFilterEnabled) {
+ filterHook.filterTables(ret);
+ }
+
+ return ret;
}
@Override
public GetTablesResult get_table_objects_by_name_req(GetTablesRequest req) throws TException {
String catName = req.isSetCatName() ? req.getCatName() : getDefaultCatalog(conf);
- return new GetTablesResult(getTableObjectsInternal(catName,
- req.getDbName(), req.getTblNames(), req.getCapabilities()));
+ List ret = getTableObjectsInternal(catName, req.getDbName(),
+ req.getTblNames(), req.getCapabilities());
+ if (isServerFilterEnabled) {
+ filterHook.filterTables(ret);
+ }
+ return new GetTablesResult(ret);
}
private List getTableObjectsInternal(String catName, String dbName,
@@ -3125,6 +3190,9 @@ private boolean doesClientHaveCapability(ClientCapabilities client, ClientCapabi
throw new InvalidOperationException(filter + " cannot apply null filter");
}
tables = getMS().listTableNamesByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], filter, maxTables);
+ if (isServerFilterEnabled) {
+ tables = filterHook.filterTableNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tables);
+ }
} catch (MetaException | InvalidOperationException | UnknownDBException e) {
ex = e;
throw e;
@@ -4525,8 +4593,14 @@ public Partition get_partition(final String db_name, final String tbl_name,
Partition ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
fireReadTablePreEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
ret = getMS().getPartition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartition(ret);
+ }
} catch (Exception e) {
ex = e;
throwMetaException(e);
@@ -4537,6 +4611,32 @@ public Partition get_partition(final String db_name, final String tbl_name,
}
/**
+ * This is a helper method to filter out a given database or a table name. This will improve
+ * performance when filtering partitions. If the db or table is filtered out, we don't need
+ * to even fetch the partitions. We can throw a NoSuchObjectException.
+ * @param dbName the database name
+ * @param tblName the table name contained in the database
+ * @throws NoSuchObjectException if the database or table is filtered out
+ */
+ private void checkDbAndTableFilters(final String dbName, final String tblName)
+ throws NoSuchObjectException, MetaException {
+ String[] parsedDbName = parseDbName(dbName, conf);
+ List filteredDb = isServerFilterEnabled ?
+ filterHook.filterDatabases(Collections.singletonList(dbName)):
+ Collections.singletonList(dbName);
+ if (filteredDb.isEmpty()) {
+ throw new NoSuchObjectException("Database " + dbName + " does not exist");
+ }
+
+ List filteredTable = isServerFilterEnabled ?
+ filterHook.filterTableNames(parsedDbName[CAT_NAME],
+ dbName, Collections.singletonList(tblName)) : Collections.singletonList(tblName);
+ if (filteredTable.isEmpty()) {
+ throw new NoSuchObjectException("Table " + tblName + " does not exist");
+ }
+ }
+
+ /**
* Fire a pre-event for read table operation, if there are any
* pre-event listeners registered
*/
@@ -4566,8 +4666,14 @@ public Partition get_partition_with_auth(final String db_name,
Partition ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
ret = getMS().getPartitionWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tbl_name, part_vals, user_name, group_names);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartition(ret);
+ }
} catch (InvalidObjectException e) {
ex = e;
throw new NoSuchObjectException(e.getMessage());
@@ -4589,10 +4695,16 @@ public Partition get_partition_with_auth(final String db_name,
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tbl_name, NO_FILTER_STRING, max_parts);
ret = getMS().getPartitions(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
max_parts);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (Exception e) {
ex = e;
throwMetaException(e);
@@ -4613,10 +4725,16 @@ public Partition get_partition_with_auth(final String db_name,
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(dbName, tblName);
checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tblName, NO_FILTER_STRING, maxParts);
ret = getMS().getPartitionsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
maxParts, userName, groupNames);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (InvalidObjectException e) {
ex = e;
throw new NoSuchObjectException(e.getMessage());
@@ -4752,8 +4870,15 @@ private static boolean is_partition_spec_grouping_enabled(Table table) {
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
ret = getMS().listPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
max_parts);
+ if (isServerFilterEnabled) {
+ ret = filterHook
+ .filterPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret);
+ }
} catch (MetaException e) {
ex = e;
throw e;
@@ -5100,6 +5225,9 @@ private void alter_table_core(String catName, String dbname, String name, Table
String[] parsedDbName = parseDbName(dbname, conf);
try {
ret = getMS().getTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], pattern);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterTableNames(parsedDbName[CAT_NAME], dbname, ret);
+ }
} catch (MetaException e) {
ex = e;
throw e;
@@ -5165,6 +5293,9 @@ private void alter_table_core(String catName, String dbname, String name, Table
String[] parsedDbName = parseDbName(dbname, conf);
try {
ret = getMS().getAllTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterTableNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], ret);
+ }
} catch (MetaException e) {
ex = e;
throw e;
@@ -5540,9 +5671,15 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
// Don't send the parsedDbName, as this method will parse itself.
ret = get_partitions_ps_with_auth(db_name, tbl_name, part_vals,
max_parts, null, null);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (Exception e) {
ex = e;
rethrowException(e);
@@ -5565,8 +5702,14 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
ret = getMS().listPartitionsPsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tbl_name, part_vals, max_parts, userName, groupNames);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (InvalidObjectException e) {
ex = e;
throw new MetaException(e.getMessage());
@@ -5590,8 +5733,15 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(db_name, tbl_name);
ret = getMS().listPartitionNamesPs(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
part_vals, max_parts);
+ if (isServerFilterEnabled) {
+ ret = filterHook
+ .filterPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret);
+ }
} catch (Exception e) {
ex = e;
rethrowException(e);
@@ -6032,10 +6182,16 @@ public boolean delete_table_column_statistics(String dbName, String tableName, S
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(dbName, tblName);
checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tblName, filter, maxParts);
ret = getMS().getPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
filter, maxParts);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (Exception e) {
ex = e;
rethrowException(e);
@@ -6169,8 +6325,14 @@ private int get_num_partitions_by_expr(final String catName, final String dbName
List ret = null;
Exception ex = null;
try {
+ // For improved performance, we'll check if the said db and table are to be filtered out.
+ // If so, then we won't proceed with querying the partitions.
+ checkDbAndTableFilters(dbName, tblName);
ret = getMS().getPartitionsByNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
partNames);
+ if (isServerFilterEnabled) {
+ ret = filterHook.filterPartitions(ret);
+ }
} catch (Exception e) {
ex = e;
rethrowException(e);