diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..aab5427 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -54,7 +54,6 @@ * Hive Configuration. */ public class HiveConf extends Configuration { - protected String hiveJar; protected Properties origProp; protected String auxJars; @@ -1478,6 +1477,9 @@ "Maximum number of worker threads when in HTTP mode."), HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."), + HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60, + "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " + + "excess threads are killed after this time interval."), // binary transport settings HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, @@ -1500,7 +1502,9 @@ "Minimum number of Thrift worker threads"), HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500, "Maximum number of Thrift worker threads"), - + HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60, + "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " + + "excess threads are killed after this time interval."), // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 06d7595..b7d7409 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -241,22 +241,31 @@ public TTransport getTransport(TTransport trans) { } public static class HMSHandler extends FacebookBase implements - IHMSHandler { + IHMSHandler { public static final Log LOG = HiveMetaStore.LOG; private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, - // right now they come from jpox.properties + // right now they come from jpox.properties private static String currentUrl; private Warehouse wh; // hdfs warehouse - private final ThreadLocal threadLocalMS = + + private static final ThreadLocal threadLocalMS = new ThreadLocal() { - @Override - protected synchronized RawStore initialValue() { - return null; - } - }; + @Override + protected synchronized RawStore initialValue() { + return null; + } + }; + + public static RawStore getRawStore() { + return threadLocalMS.get(); + } + + public static void removeRawStore() { + threadLocalMS.remove(); + } private final ThreadLocal threadLocalTxn = new ThreadLocal() { @Override @@ -269,11 +278,11 @@ protected synchronized TxnHandler initialValue() { // to the conf using the connection hook private final ThreadLocal threadLocalConf = new ThreadLocal() { - @Override - protected synchronized Configuration initialValue() { - return null; - } - }; + @Override + protected synchronized Configuration initialValue() { + return null; + } + }; public static final String AUDIT_FORMAT = "ugi=%s\t" + // ugi @@ -283,11 +292,11 @@ protected synchronized Configuration initialValue() { HiveMetaStore.class.getName() + ".audit"); private static final ThreadLocal auditFormatter = new ThreadLocal() { - @Override - protected Formatter initialValue() { - return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); - } - }; + @Override + protected Formatter initialValue() { + return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4)); + } + }; private final void logAuditEvent(String cmd) { if (cmd == null) { @@ -390,8 +399,8 @@ public void init() throws MetaException { MetaStoreInitListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); for (MetaStoreInitListener singleInitListener: initListeners) { - MetaStoreInitContext context = new MetaStoreInitContext(); - singleInitListener.onInit(context); + MetaStoreInitContext context = new MetaStoreInitContext(); + singleInitListener.onInit(context); } String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", @@ -400,6 +409,9 @@ public void init() throws MetaException { alterHandlerName), hiveConf); wh = new Warehouse(hiveConf); + // Always create a new RawStore object for this thread, if it doesn't exist yet + createMS(); + synchronized (HMSHandler.class) { if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) { createDefaultDB(); @@ -519,6 +531,16 @@ public RawStore getMS() throws MetaException { return ms; } + /** + * Create a cached RawStore. + * + * @return the cached RawStore + * @throws MetaException + */ + public RawStore createMS() throws MetaException { + return getMS(); + } + private TxnHandler getTxnHandler() { TxnHandler txn = threadLocalTxn.get(); if (txn == null) { @@ -541,7 +563,7 @@ private void createDefaultDB_core(RawStore ms) throws MetaException, InvalidObje ms.getDatabase(DEFAULT_DATABASE_NAME); } catch (NoSuchObjectException e) { Database db = new Database(DEFAULT_DATABASE_NAME, DEFAULT_DATABASE_COMMENT, - wh.getDefaultDatabasePath(DEFAULT_DATABASE_NAME).toString(), null); + wh.getDefaultDatabasePath(DEFAULT_DATABASE_NAME).toString(), null); db.setOwnerName(PUBLIC); db.setOwnerType(PrincipalType.ROLE); ms.createDatabase(db); @@ -588,8 +610,8 @@ private void createDefaultRoles() throws MetaException { // now grant all privs to admin PrivilegeBag privs = new PrivilegeBag(); privs.addToPrivileges(new HiveObjectPrivilege( new HiveObjectRef(HiveObjectType.GLOBAL, null, - null, null, null), ADMIN, PrincipalType.ROLE, new PrivilegeGrantInfo("All", 0, ADMIN, - PrincipalType.ROLE, true))); + null, null, null), ADMIN, PrincipalType.ROLE, new PrivilegeGrantInfo("All", 0, ADMIN, + PrincipalType.ROLE, true))); try { ms.grantPrivileges(privs); } catch (InvalidObjectException e) { @@ -612,10 +634,10 @@ private void addAdminUsers() throws MetaException { // Since user names need to be valid unix user names, per IEEE Std 1003.1-2001 they cannot // contain comma, so we can safely split above string on comma. - Iterator users = Splitter.on(",").trimResults().omitEmptyStrings().split(userStr).iterator(); + Iterator users = Splitter.on(",").trimResults().omitEmptyStrings().split(userStr).iterator(); if (!users.hasNext()) { LOG.info("No user is added in admin role, since config value "+ userStr + - " is in incorrect format. We accept comma seprated list of users."); + " is in incorrect format. We accept comma seprated list of users."); return; } Role adminRole; @@ -685,7 +707,7 @@ private void endFunction(String function, boolean successful, Exception e) { endFunction(function, successful, e, null); } private void endFunction(String function, boolean successful, Exception e, - String inputTableName) { + String inputTableName) { endFunction(function, new MetaStoreEndFunctionContext(successful, e, inputTableName)); } @@ -811,7 +833,7 @@ public void create_database(final Database db) @Override public Database get_database(final String name) throws NoSuchObjectException, - MetaException { + MetaException { startFunction("get_database", ": " + name); Database db = null; Exception ex = null; @@ -852,8 +874,8 @@ public void alter_database(final String dbName, final Database db) private void drop_database_core(RawStore ms, final String name, final boolean deleteData, final boolean cascade) - throws NoSuchObjectException, InvalidOperationException, MetaException, - IOException, InvalidObjectException, InvalidInputException { + throws NoSuchObjectException, InvalidOperationException, MetaException, + IOException, InvalidObjectException, InvalidInputException { boolean success = false; Database db = null; List tablePaths = new ArrayList(); @@ -1083,7 +1105,7 @@ private void create_type_core(final RawStore ms, final Type type) @Override public boolean create_type(final Type type) throws AlreadyExistsException, - MetaException, InvalidObjectException { + MetaException, InvalidObjectException { startFunction("create_type", ": " + type.toString()); boolean success = false; Exception ex = null; @@ -1193,8 +1215,8 @@ public boolean drop_type(final String name) throws MetaException, NoSuchObjectEx private void create_table_core(final RawStore ms, final Table tbl, final EnvironmentContext envContext) - throws AlreadyExistsException, MetaException, - InvalidObjectException, NoSuchObjectException { + throws AlreadyExistsException, MetaException, + InvalidObjectException, NoSuchObjectException { if (!MetaStoreUtils.validateName(tbl.getTableName())) { throw new InvalidObjectException(tbl.getTableName() @@ -1302,14 +1324,14 @@ private void create_table_core(final RawStore ms, final Table tbl, @Override public void create_table(final Table tbl) throws AlreadyExistsException, - MetaException, InvalidObjectException { + MetaException, InvalidObjectException { create_table_with_environment_context(tbl, null); } @Override public void create_table_with_environment_context(final Table tbl, final EnvironmentContext envContext) - throws AlreadyExistsException, MetaException, InvalidObjectException { + throws AlreadyExistsException, MetaException, InvalidObjectException { startFunction("create_table", ": " + tbl.toString()); boolean success = false; Exception ex = null; @@ -1342,8 +1364,8 @@ private boolean is_table_exists(RawStore ms, String dbname, String name) private void drop_table_core(final RawStore ms, final String dbname, final String name, final boolean deleteData, final EnvironmentContext envContext) - throws NoSuchObjectException, MetaException, IOException, - InvalidObjectException, InvalidInputException { + throws NoSuchObjectException, MetaException, IOException, + InvalidObjectException, InvalidInputException { boolean success = false; boolean isExternal = false; Path tblPath = null; @@ -1477,9 +1499,9 @@ private void deletePartitionData(List partPaths) { * @throws NoSuchObjectException */ private List dropPartitionsAndGetLocations(RawStore ms, String dbName, - String tableName, Path tablePath, List partitionKeys, boolean checkLocation) - throws MetaException, IOException, NoSuchObjectException, InvalidObjectException, - InvalidInputException { + String tableName, Path tablePath, List partitionKeys, boolean checkLocation) + throws MetaException, IOException, NoSuchObjectException, InvalidObjectException, + InvalidInputException { int partitionBatchSize = HiveConf.getIntVar(hiveConf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX); Path tableDnsPath = null; @@ -1530,7 +1552,7 @@ public void drop_table(final String dbname, final String name, final boolean del @Override public void drop_table_with_environment_context(final String dbname, final String name, final boolean deleteData, final EnvironmentContext envContext) - throws NoSuchObjectException, MetaException { + throws NoSuchObjectException, MetaException { startTableFunction("drop_table", dbname, name); boolean success = false; @@ -1573,7 +1595,7 @@ private boolean isIndexTable(Table table) { @Override public Table get_table(final String dbname, final String name) throws MetaException, - NoSuchObjectException { + NoSuchObjectException { Table t = null; startTableFunction("get_table", dbname, name); Exception ex = null; @@ -1650,7 +1672,7 @@ public Table get_table(final String dbname, final String name) throws MetaExcept @Override public List get_table_names_by_filter( final String dbName, final String filter, final short maxTables) - throws MetaException, InvalidOperationException, UnknownDBException { + throws MetaException, InvalidOperationException, UnknownDBException { List tables = null; startFunction("get_table_names_by_filter", ": db = " + dbName + ", filter = " + filter); Exception ex = null; @@ -1786,7 +1808,7 @@ public Partition append_partition(final String dbName, final String tableName, @Override public Partition append_partition_with_environment_context(final String dbName, final String tableName, final List part_vals, final EnvironmentContext envContext) - throws InvalidObjectException, AlreadyExistsException, MetaException { + throws InvalidObjectException, AlreadyExistsException, MetaException { startPartitionFunction("append_partition", dbName, tableName, part_vals); if (LOG.isDebugEnabled()) { for (String part : part_vals) { @@ -1938,7 +1960,7 @@ public AddPartitionsResult add_partitions_req(AddPartitionsRequest request) @Override public int add_partitions(final List parts) throws MetaException, - InvalidObjectException, AlreadyExistsException { + InvalidObjectException, AlreadyExistsException { startFunction("add_partition"); if (parts.size() == 0) { return 0; @@ -2062,7 +2084,7 @@ private void initializeAddedPartition( private Partition add_partition_core(final RawStore ms, final Partition part, final EnvironmentContext envContext) - throws InvalidObjectException, AlreadyExistsException, MetaException, TException { + throws InvalidObjectException, AlreadyExistsException, MetaException, TException { boolean success = false; Table tbl = null; try { @@ -2100,7 +2122,7 @@ private Partition add_partition_core(final RawStore ms, private void fireMetaStoreAddPartitionEvent(final Table tbl, final List parts, final EnvironmentContext envContext, boolean success) - throws MetaException { + throws MetaException { if (tbl != null && parts != null && !parts.isEmpty()) { AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, parts, success, this); @@ -2121,8 +2143,8 @@ public Partition add_partition(final Partition part) @Override public Partition add_partition_with_environment_context( final Partition part, EnvironmentContext envContext) - throws InvalidObjectException, AlreadyExistsException, - MetaException { + throws InvalidObjectException, AlreadyExistsException, + MetaException { startTableFunction("add_partition", part.getDbName(), part.getTableName()); Partition ret = null; @@ -2194,7 +2216,7 @@ public Partition exchange_partition(Map partitionSpecs, destPartition.getSd().setLocation(destPartitionPath.toString()); ms.addPartition(destPartition); ms.dropPartition(partition.getDbName(), sourceTable.getTableName(), - partition.getValues()); + partition.getValues()); } /** * TODO: Use the hard link feature of hdfs @@ -2214,9 +2236,9 @@ public Partition exchange_partition(Map partitionSpecs, } private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name, - List part_vals, final boolean deleteData, final EnvironmentContext envContext) - throws MetaException, NoSuchObjectException, IOException, InvalidObjectException, - InvalidInputException { + List part_vals, final boolean deleteData, final EnvironmentContext envContext) + throws MetaException, NoSuchObjectException, IOException, InvalidObjectException, + InvalidInputException { boolean success = false; Path partPath = null; Table tbl = null; @@ -2268,7 +2290,7 @@ private boolean drop_partition_common(RawStore ms, String db_name, String tbl_na } for (MetaStoreEventListener listener : listeners) { DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); + new DropPartitionEvent(tbl, part, success, deleteData, this); dropPartitionEvent.setEnvironmentContext(envContext); listener.onDropPartition(dropPartitionEvent); } @@ -2286,7 +2308,7 @@ private void deleteParentRecursive(Path parent, int depth) throws IOException, M @Override public boolean drop_partition(final String db_name, final String tbl_name, final List part_vals, final boolean deleteData) - throws NoSuchObjectException, MetaException, TException { + throws NoSuchObjectException, MetaException, TException { return drop_partition_with_environment_context(db_name, tbl_name, part_vals, deleteData, null); } @@ -2314,124 +2336,124 @@ public DropPartitionsResult drop_partitions_req( EnvironmentContext envContext = request.isSetEnvironmentContext() ? request.getEnvironmentContext() : null; - boolean success = false; - ms.openTransaction(); - Table tbl = null; - List parts = null; - try { - // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. - // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes. - tbl = get_table(dbName, tblName); - int minCount = 0; - RequestPartsSpec spec = request.getParts(); - List partNames = null; - if (spec.isSetExprs()) { - // Dropping by expressions. - parts = new ArrayList(spec.getExprs().size()); - for (DropPartitionsExpr expr : spec.getExprs()) { - ++minCount; // At least one partition per expression, if not ifExists - List result = new ArrayList(); - boolean hasUnknown = ms.getPartitionsByExpr( - dbName, tblName, expr.getExpr(), null, (short)-1, result); - if (hasUnknown) { - // Expr is built by DDLSA, it should only contain part cols and simple ops - throw new MetaException("Unexpected unknown partitions to drop"); - } - // this is to prevent dropping archived partition which is archived in a - // different level the drop command specified. - if (!ignoreProtection && expr.isSetPartArchiveLevel()) { - for (Partition part : parts) { - if (MetaStoreUtils.isArchived(part) - && MetaStoreUtils.getArchivingLevel(part) < expr.getPartArchiveLevel()) { - throw new MetaException("Cannot drop a subset of partitions " - + " in an archive, partition " + part); + boolean success = false; + ms.openTransaction(); + Table tbl = null; + List parts = null; + try { + // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. + // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes. + tbl = get_table(dbName, tblName); + int minCount = 0; + RequestPartsSpec spec = request.getParts(); + List partNames = null; + if (spec.isSetExprs()) { + // Dropping by expressions. + parts = new ArrayList(spec.getExprs().size()); + for (DropPartitionsExpr expr : spec.getExprs()) { + ++minCount; // At least one partition per expression, if not ifExists + List result = new ArrayList(); + boolean hasUnknown = ms.getPartitionsByExpr( + dbName, tblName, expr.getExpr(), null, (short)-1, result); + if (hasUnknown) { + // Expr is built by DDLSA, it should only contain part cols and simple ops + throw new MetaException("Unexpected unknown partitions to drop"); + } + // this is to prevent dropping archived partition which is archived in a + // different level the drop command specified. + if (!ignoreProtection && expr.isSetPartArchiveLevel()) { + for (Partition part : parts) { + if (MetaStoreUtils.isArchived(part) + && MetaStoreUtils.getArchivingLevel(part) < expr.getPartArchiveLevel()) { + throw new MetaException("Cannot drop a subset of partitions " + + " in an archive, partition " + part); + } + } } + parts.addAll(result); } + } else if (spec.isSetNames()) { + partNames = spec.getNames(); + minCount = partNames.size(); + parts = ms.getPartitionsByNames(dbName, tblName, partNames); + } else { + throw new MetaException("Partition spec is not set"); } - parts.addAll(result); - } - } else if (spec.isSetNames()) { - partNames = spec.getNames(); - minCount = partNames.size(); - parts = ms.getPartitionsByNames(dbName, tblName, partNames); - } else { - throw new MetaException("Partition spec is not set"); - } - if ((parts.size() < minCount) && !ifExists) { - throw new NoSuchObjectException("Some partitions to drop are missing"); - } + if ((parts.size() < minCount) && !ifExists) { + throw new NoSuchObjectException("Some partitions to drop are missing"); + } - List colNames = null; - if (partNames == null) { - partNames = new ArrayList(parts.size()); - colNames = new ArrayList(tbl.getPartitionKeys().size()); - for (FieldSchema col : tbl.getPartitionKeys()) { - colNames.add(col.getName()); - } - } + List colNames = null; + if (partNames == null) { + partNames = new ArrayList(parts.size()); + colNames = new ArrayList(tbl.getPartitionKeys().size()); + for (FieldSchema col : tbl.getPartitionKeys()) { + colNames.add(col.getName()); + } + } - for (Partition part : parts) { - if (!ignoreProtection && !MetaStoreUtils.canDropPartition(tbl, part)) { - throw new MetaException("Table " + tbl.getTableName() - + " Partition " + part + " is protected from being dropped"); - } + for (Partition part : parts) { + if (!ignoreProtection && !MetaStoreUtils.canDropPartition(tbl, part)) { + throw new MetaException("Table " + tbl.getTableName() + + " Partition " + part + " is protected from being dropped"); + } - firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this)); - if (colNames != null) { - partNames.add(FileUtils.makePartName(colNames, part.getValues())); - } - // Preserve the old behavior of failing when we cannot write, even w/o deleteData, - // and even if the table is external. That might not make any sense. - if (MetaStoreUtils.isArchived(part)) { - Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part); - verifyIsWritablePath(archiveParentDir); - archToDelete.add(archiveParentDir); - } - if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { - Path partPath = new Path(part.getSd().getLocation()); - verifyIsWritablePath(partPath); - dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size())); - } - } + firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this)); + if (colNames != null) { + partNames.add(FileUtils.makePartName(colNames, part.getValues())); + } + // Preserve the old behavior of failing when we cannot write, even w/o deleteData, + // and even if the table is external. That might not make any sense. + if (MetaStoreUtils.isArchived(part)) { + Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part); + verifyIsWritablePath(archiveParentDir); + archToDelete.add(archiveParentDir); + } + if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { + Path partPath = new Path(part.getSd().getLocation()); + verifyIsWritablePath(partPath); + dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size())); + } + } - ms.dropPartitions(dbName, tblName, partNames); - success = ms.commitTransaction(); - DropPartitionsResult result = new DropPartitionsResult(); - if (needResult) { - result.setPartitions(parts); - } - return result; - } finally { - if (!success) { - ms.rollbackTransaction(); - } else if (deleteData && !isExternal(tbl)) { - // Archived partitions have har:/to_har_file as their location. - // The original directory was saved in params - for (Path path : archToDelete) { - wh.deleteDir(path, true); - } - for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true); - try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1); - } catch (IOException ex) { - LOG.warn("Error from deleteParentRecursive", ex); - throw new MetaException("Failed to delete parent: " + ex.getMessage()); + ms.dropPartitions(dbName, tblName, partNames); + success = ms.commitTransaction(); + DropPartitionsResult result = new DropPartitionsResult(); + if (needResult) { + result.setPartitions(parts); } - } - } - if (parts != null) { - for (Partition part : parts) { - for (MetaStoreEventListener listener : listeners) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - listener.onDropPartition(dropPartitionEvent); + return result; + } finally { + if (!success) { + ms.rollbackTransaction(); + } else if (deleteData && !isExternal(tbl)) { + // Archived partitions have har:/to_har_file as their location. + // The original directory was saved in params + for (Path path : archToDelete) { + wh.deleteDir(path, true); + } + for (PathAndPartValSize p : dirsToDelete) { + wh.deleteDir(p.path, true); + try { + deleteParentRecursive(p.path.getParent(), p.partValSize - 1); + } catch (IOException ex) { + LOG.warn("Error from deleteParentRecursive", ex); + throw new MetaException("Failed to delete parent: " + ex.getMessage()); + } + } + } + if (parts != null) { + for (Partition part : parts) { + for (MetaStoreEventListener listener : listeners) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, success, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + listener.onDropPartition(dropPartitionEvent); + } + } } } - } - } } private void verifyIsWritablePath(Path dir) throws MetaException { @@ -2451,7 +2473,7 @@ private void verifyIsWritablePath(Path dir) throws MetaException { public boolean drop_partition_with_environment_context(final String db_name, final String tbl_name, final List part_vals, final boolean deleteData, final EnvironmentContext envContext) - throws NoSuchObjectException, MetaException, TException { + throws NoSuchObjectException, MetaException, TException { startPartitionFunction("drop_partition", db_name, tbl_name, part_vals); LOG.info("Partition values:" + part_vals); @@ -2500,7 +2522,7 @@ public Partition get_partition(final String db_name, final String tbl_name, public Partition get_partition_with_auth(final String db_name, final String tbl_name, final List part_vals, final String user_name, final List group_names) - throws MetaException, NoSuchObjectException, TException { + throws MetaException, NoSuchObjectException, TException { startPartitionFunction("get_partition_with_auth", db_name, tbl_name, part_vals); @@ -2596,8 +2618,8 @@ public Partition get_partition_with_auth(final String db_name, @Override public void alter_partition(final String db_name, final String tbl_name, final Partition new_part) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, + TException { rename_partition(db_name, tbl_name, null, new_part); } @@ -2605,7 +2627,7 @@ public void alter_partition(final String db_name, final String tbl_name, public void alter_partition_with_environment_context(final String dbName, final String tableName, final Partition newPartition, final EnvironmentContext envContext) - throws InvalidOperationException, MetaException, TException { + throws InvalidOperationException, MetaException, TException { rename_partition(dbName, tableName, null, newPartition, envContext); } @@ -2613,7 +2635,7 @@ public void alter_partition_with_environment_context(final String dbName, @Override public void rename_partition(final String db_name, final String tbl_name, final List part_vals, final Partition new_part) - throws InvalidOperationException, MetaException, TException { + throws InvalidOperationException, MetaException, TException { // Call rename_partition without an environment context. rename_partition(db_name, tbl_name, part_vals, new_part, null); } @@ -2621,8 +2643,8 @@ public void rename_partition(final String db_name, final String tbl_name, private void rename_partition(final String db_name, final String tbl_name, final List part_vals, final Partition new_part, final EnvironmentContext envContext) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, + TException { startTableFunction("alter_partition", db_name, tbl_name); if (LOG.isInfoEnabled()) { @@ -2676,8 +2698,8 @@ private void rename_partition(final String db_name, final String tbl_name, @Override public void alter_partitions(final String db_name, final String tbl_name, final List new_parts) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, + TException { startTableFunction("alter_partitions", db_name, tbl_name); @@ -2744,7 +2766,7 @@ public void alter_partitions(final String db_name, final String tbl_name, @Override public void alter_index(final String dbname, final String base_table_name, final String index_name, final Index newIndex) - throws InvalidOperationException, MetaException { + throws InvalidOperationException, MetaException { startFunction("alter_index", ": db=" + dbname + " base_tbl=" + base_table_name + " idx=" + index_name + " newidx=" + newIndex.getIndexName()); newIndex.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System @@ -2782,7 +2804,7 @@ public String getVersion() throws TException { @Override public void alter_table(final String dbname, final String name, final Table newTable) - throws InvalidOperationException, MetaException { + throws InvalidOperationException, MetaException { // Do not set an environment context. alter_table_with_environment_context(dbname, name, newTable, null); } @@ -2791,7 +2813,7 @@ public void alter_table(final String dbname, final String name, public void alter_table_with_environment_context(final String dbname, final String name, final Table newTable, final EnvironmentContext envContext) - throws InvalidOperationException, MetaException { + throws InvalidOperationException, MetaException { startFunction("alter_table", ": db=" + dbname + " tbl=" + name + " newtbl=" + newTable.getTableName()); @@ -2893,8 +2915,8 @@ public void alter_table_with_environment_context(final String dbname, throw new UnknownTableException(e.getMessage()); } if (null == tbl.getSd().getSerdeInfo().getSerializationLib() || - hiveConf.getStringCollection(ConfVars.SERDESUSINGMETASTOREFORSCHEMA.varname).contains - (tbl.getSd().getSerdeInfo().getSerializationLib())) { + hiveConf.getStringCollection(ConfVars.SERDESUSINGMETASTOREFORSCHEMA.varname).contains + (tbl.getSd().getSerdeInfo().getSerializationLib())) { ret = tbl.getSd().getCols(); } else { try { @@ -3066,7 +3088,7 @@ public String get_config_value(String name, String defaultValue) private Partition get_partition_by_name_core(final RawStore ms, final String db_name, final String tbl_name, final String part_name) - throws MetaException, NoSuchObjectException, TException { + throws MetaException, NoSuchObjectException, TException { List partVals = null; try { partVals = getPartValsFromName(ms, db_name, tbl_name, part_name); @@ -3112,7 +3134,7 @@ public Partition append_partition_by_name(final String db_name, final String tbl @Override public Partition append_partition_by_name_with_environment_context(final String db_name, final String tbl_name, final String part_name, final EnvironmentContext env_context) - throws InvalidObjectException, AlreadyExistsException, MetaException, TException { + throws InvalidObjectException, AlreadyExistsException, MetaException, TException { startFunction("append_partition_by_name", ": db=" + db_name + " tbl=" + tbl_name + " part=" + part_name); @@ -3238,7 +3260,7 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n @Override public List get_partition_names_ps(final String db_name, final String tbl_name, final List part_vals, final short max_parts) - throws MetaException, TException, NoSuchObjectException { + throws MetaException, TException, NoSuchObjectException { startPartitionFunction("get_partitions_names_ps", db_name, tbl_name, part_vals); List ret = null; Exception ex = null; @@ -3267,7 +3289,7 @@ public boolean drop_partition_by_name_with_environment_context(final String db_n @Override public Map partition_name_to_spec(String part_name) throws MetaException, - TException { + TException { if (part_name.length() == 0) { return new HashMap(); } @@ -3468,7 +3490,7 @@ public Index get_index_by_name(final String dbName, final String tblName, private Index get_index_by_name_core(final RawStore ms, final String db_name, final String tbl_name, final String index_name) - throws MetaException, NoSuchObjectException, TException { + throws MetaException, NoSuchObjectException, TException { Index index = ms.getIndex(db_name, tbl_name, index_name); if (index == null) { @@ -3542,14 +3564,14 @@ private String lowerCaseConvertPartName(String partName) throws MetaException { @Override public ColumnStatistics get_table_column_statistics(String dbName, String tableName, - String colName) throws NoSuchObjectException, MetaException, TException, - InvalidInputException, InvalidObjectException - { + String colName) throws NoSuchObjectException, MetaException, TException, + InvalidInputException, InvalidObjectException + { dbName = dbName.toLowerCase(); tableName = tableName.toLowerCase(); colName = colName.toLowerCase(); startFunction("get_column_statistics_by_table: db=" + dbName + " table=" + tableName + - " column=" + colName); + " column=" + colName); ColumnStatistics statsObj = null; try { statsObj = getMS().getTableColumnStatistics( @@ -3559,7 +3581,7 @@ public ColumnStatistics get_table_column_statistics(String dbName, String tableN } finally { endFunction("get_column_statistics_by_table: ", statsObj != null, null, tableName); } - } + } @Override public TableStatsResult get_table_statistics_req(TableStatsRequest request) @@ -3580,8 +3602,8 @@ public TableStatsResult get_table_statistics_req(TableStatsRequest request) @Override public ColumnStatistics get_partition_column_statistics(String dbName, String tableName, - String partName, String colName) throws NoSuchObjectException, MetaException, - InvalidInputException, TException, InvalidObjectException { + String partName, String colName) throws NoSuchObjectException, MetaException, + InvalidInputException, TException, InvalidObjectException { dbName = dbName.toLowerCase(); tableName = tableName.toLowerCase(); colName = colName.toLowerCase(); @@ -3628,9 +3650,9 @@ public PartitionsStatsResult get_partitions_statistics_req(PartitionsStatsReques @Override public boolean update_table_column_statistics(ColumnStatistics colStats) - throws NoSuchObjectException,InvalidObjectException,MetaException,TException, - InvalidInputException - { + throws NoSuchObjectException,InvalidObjectException,MetaException,TException, + InvalidInputException + { String dbName = null; String tableName = null; String colName = null; @@ -3649,13 +3671,13 @@ public boolean update_table_column_statistics(ColumnStatistics colStats) colName = statsObj.getColName().toLowerCase(); statsObj.setColName(colName); startFunction("write_column_statistics: db=" + dbName + " table=" + tableName + - " column=" + colName); + " column=" + colName); } - colStats.setStatsDesc(statsDesc); - colStats.setStatsObj(statsObjs); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(statsObjs); - boolean ret = false; + boolean ret = false; try { ret = getMS().updateTableColumnStatistics(colStats); @@ -3663,13 +3685,13 @@ public boolean update_table_column_statistics(ColumnStatistics colStats) } finally { endFunction("write_column_statistics: ", ret != false, null, tableName); } - } + } @Override public boolean update_partition_column_statistics(ColumnStatistics colStats) - throws NoSuchObjectException,InvalidObjectException,MetaException,TException, - InvalidInputException - { + throws NoSuchObjectException,InvalidObjectException,MetaException,TException, + InvalidInputException + { String dbName = null; String tableName = null; @@ -3694,7 +3716,7 @@ public boolean update_partition_column_statistics(ColumnStatistics colStats) colName = statsObj.getColName().toLowerCase(); statsObj.setColName(colName); startFunction("write_partition_column_statistics: db=" + dbName + " table=" + tableName + - " part=" + partName + "column=" + colName); + " part=" + partName + "column=" + colName); } colStats.setStatsDesc(statsDesc); @@ -3710,13 +3732,13 @@ public boolean update_partition_column_statistics(ColumnStatistics colStats) } finally { endFunction("write_partition_column_statistics: ", ret != false, null, tableName); } - } + } @Override public boolean delete_partition_column_statistics(String dbName, String tableName, - String partName, String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, TException, InvalidInputException - { + String partName, String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, TException, InvalidInputException + { dbName = dbName.toLowerCase(); tableName = tableName.toLowerCase(); if (colName != null) { @@ -3724,24 +3746,24 @@ public boolean delete_partition_column_statistics(String dbName, String tableNam } String convertedPartName = lowerCaseConvertPartName(partName); startFunction("delete_column_statistics_by_partition: db=" + dbName + " table=" + tableName + - " partition=" + convertedPartName + " column=" + colName); + " partition=" + convertedPartName + " column=" + colName); boolean ret = false; try { List partVals = getPartValsFromName(getMS(), dbName, tableName, convertedPartName); ret = getMS().deletePartitionColumnStatistics(dbName, tableName, - convertedPartName, partVals, colName); + convertedPartName, partVals, colName); } finally { endFunction("delete_column_statistics_by_partition: ", ret != false, null, tableName); } return ret; - } + } @Override public boolean delete_table_column_statistics(String dbName, String tableName, String colName) - throws NoSuchObjectException, MetaException, InvalidObjectException, TException, - InvalidInputException - { + throws NoSuchObjectException, MetaException, InvalidObjectException, TException, + InvalidInputException + { dbName = dbName.toLowerCase(); tableName = tableName.toLowerCase(); @@ -3749,7 +3771,7 @@ public boolean delete_table_column_statistics(String dbName, String tableName, S colName = colName.toLowerCase(); } startFunction("delete_column_statistics_by_table: db=" + dbName + " table=" + tableName + - " column=" + colName); + " column=" + colName); boolean ret = false; try { @@ -3758,12 +3780,12 @@ public boolean delete_table_column_statistics(String dbName, String tableName, S endFunction("delete_column_statistics_by_table: ", ret != false, null, tableName); } return ret; - } + } @Override public List get_partitions_by_filter(final String dbName, final String tblName, final String filter, final short maxParts) - throws MetaException, NoSuchObjectException, TException { + throws MetaException, NoSuchObjectException, TException { startTableFunction("get_partitions_by_filter", dbName, tblName); List ret = null; @@ -3817,7 +3839,7 @@ private void rethrowException(Exception e) @Override public List get_partitions_by_names(final String dbName, final String tblName, final List partNames) - throws MetaException, NoSuchObjectException, TException { + throws MetaException, NoSuchObjectException, TException { startTableFunction("get_partitions_by_names", dbName, tblName); @@ -3913,7 +3935,7 @@ private PrincipalPrivilegeSet get_db_privilege_set(final String dbName, private PrincipalPrivilegeSet get_partition_privilege_set( final String dbName, final String tableName, final String partName, final String userName, final List groupNames) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("get_partition_privilege_set"); PrincipalPrivilegeSet ret = null; @@ -3949,12 +3971,12 @@ private PrincipalPrivilegeSet get_table_privilege_set(final String dbName, public boolean grant_role(final String roleName, final String principalName, final PrincipalType principalType, final String grantor, final PrincipalType grantorType, final boolean grantOption) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("add_role_member"); firePreEvent(new PreAuthorizationCallEvent(this)); if (PUBLIC.equals(roleName)) { throw new MetaException("No user can be added to " + PUBLIC +". Since all users implictly" - + " belong to " + PUBLIC + " role."); + + " belong to " + PUBLIC + " role."); } Boolean ret = null; try { @@ -4031,7 +4053,7 @@ public boolean create_role(final Role role) incrementCounter("create_role"); firePreEvent(new PreAuthorizationCallEvent(this)); if (PUBLIC.equals(role.getRoleName())) { - throw new MetaException(PUBLIC + " role implictly exists. It can't be created."); + throw new MetaException(PUBLIC + " role implictly exists. It can't be created."); } Boolean ret = null; try { @@ -4080,7 +4102,7 @@ public boolean drop_role(final String roleName) @Override public boolean grant_privileges(final PrivilegeBag privileges) throws MetaException, - TException { + TException { incrementCounter("grant_privileges"); firePreEvent(new PreAuthorizationCallEvent(this)); Boolean ret = null; @@ -4129,21 +4151,21 @@ public GrantRevokeRoleResponse grant_revoke_role(GrantRevokeRoleRequest request) grantOption = request.isGrantOption(); } switch (request.getRequestType()) { - case GRANT: { - boolean result = grant_role(request.getRoleName(), - request.getPrincipalName(), request.getPrincipalType(), - request.getGrantor(), request.getGrantorType(), grantOption); - response.setSuccess(result); - break; - } - case REVOKE: { - boolean result = revoke_role(request.getRoleName(), request.getPrincipalName(), - request.getPrincipalType(), grantOption); - response.setSuccess(result); - break; - } - default: - throw new MetaException("Unknown request type " + request.getRequestType()); + case GRANT: { + boolean result = grant_role(request.getRoleName(), + request.getPrincipalName(), request.getPrincipalType(), + request.getGrantor(), request.getGrantorType(), grantOption); + response.setSuccess(result); + break; + } + case REVOKE: { + boolean result = revoke_role(request.getRoleName(), request.getPrincipalName(), + request.getPrincipalType(), grantOption); + response.setSuccess(result); + break; + } + default: + throw new MetaException("Unknown request type " + request.getRequestType()); } return response; @@ -4154,22 +4176,22 @@ public GrantRevokePrivilegeResponse grant_revoke_privileges(GrantRevokePrivilege throws MetaException, org.apache.thrift.TException { GrantRevokePrivilegeResponse response = new GrantRevokePrivilegeResponse(); switch (request.getRequestType()) { - case GRANT: { - boolean result = grant_privileges(request.getPrivileges()); - response.setSuccess(result); - break; - } - case REVOKE: { - boolean revokeGrantOption = false; - if (request.isSetRevokeGrantOption()) { - revokeGrantOption = request.isRevokeGrantOption(); - } - boolean result = revoke_privileges(request.getPrivileges(), revokeGrantOption); - response.setSuccess(result); - break; + case GRANT: { + boolean result = grant_privileges(request.getPrivileges()); + response.setSuccess(result); + break; + } + case REVOKE: { + boolean revokeGrantOption = false; + if (request.isSetRevokeGrantOption()) { + revokeGrantOption = request.isRevokeGrantOption(); } - default: - throw new MetaException("Unknown request type " + request.getRequestType()); + boolean result = revoke_privileges(request.getPrivileges(), revokeGrantOption); + response.setSuccess(result); + break; + } + default: + throw new MetaException("Unknown request type " + request.getRequestType()); } return response; @@ -4213,7 +4235,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, @Override public List list_privileges(String principalName, PrincipalType principalType, HiveObjectRef hiveObject) - throws MetaException, TException { + throws MetaException, TException { firePreEvent(new PreAuthorizationCallEvent(this)); if (hiveObject.getObjectType() == null) { return getAllPrivileges(principalName, principalType); @@ -4262,7 +4284,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, private List list_table_column_privileges( final String principalName, final PrincipalType principalType, final String dbName, final String tableName, final String columnName) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("list_table_column_privileges"); try { @@ -4342,7 +4364,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, private List list_db_privileges(final String principalName, final PrincipalType principalType, final String dbName) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("list_security_db_grant"); try { @@ -4380,7 +4402,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, private List list_partition_privileges( final String principalName, final PrincipalType principalType, final String dbName, final String tableName, final List partValues) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("list_security_partition_grant"); try { @@ -4446,7 +4468,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, sTbl.getPrincipalName(), principalType, new PrivilegeGrantInfo(sTbl.getPrivilege(), sTbl.getCreateTime(), sTbl .getGrantor(), PrincipalType.valueOf(sTbl - .getGrantorType()), sTbl.getGrantOption())); + .getGrantorType()), sTbl.getGrantOption())); result.add(secObj); } return result; @@ -4459,7 +4481,7 @@ private PrincipalPrivilegeSet get_user_privilege_set(final String userName, private List list_global_privileges( final String principalName, final PrincipalType principalType) - throws MetaException, TException { + throws MetaException, TException { incrementCounter("list_security_user_grant"); try { @@ -4546,7 +4568,7 @@ public long renew_delegation_token(String token_str_form) @Override public String get_delegation_token(String token_owner, String renewer_kerberos_principal_name) - throws MetaException, TException { + throws MetaException, TException { startFunction("get_delegation_token"); String ret = null; Exception ex = null; @@ -4614,7 +4636,7 @@ public void markPartitionForEvent(final String db_name, final String tbl_name, throw newMetaException(original); } } finally { - endFunction("markPartitionForEvent", tbl != null, ex, tbl_name); + endFunction("markPartitionForEvent", tbl != null, ex, tbl_name); } } @@ -4648,7 +4670,7 @@ public boolean isPartitionMarkedForEvent(final String db_name, final String tbl_ throw newMetaException(original); } } finally { - endFunction("isPartitionMarkedForEvent", ret != null, ex, tbl_name); + endFunction("isPartitionMarkedForEvent", ret != null, ex, tbl_name); } return ret; @@ -4656,7 +4678,7 @@ public boolean isPartitionMarkedForEvent(final String db_name, final String tbl_ @Override public List set_ugi(String username, List groupNames) throws MetaException, - TException { + TException { Collections.addAll(groupNames, username); return groupNames; } @@ -4705,8 +4727,8 @@ private void validateFunctionInfo(Function func) throws InvalidObjectException, @Override public void create_function(Function func) throws AlreadyExistsException, - InvalidObjectException, MetaException, NoSuchObjectException, - TException { + InvalidObjectException, MetaException, NoSuchObjectException, + TException { validateFunctionInfo(func); boolean success = false; @@ -4922,7 +4944,7 @@ public void heartbeat(HeartbeatRequest ids) @Override public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst) - throws TException { + throws TException { try { return getTxnHandler().heartbeatTxnRange(rqst); } catch (MetaException e) { @@ -5028,7 +5050,7 @@ public AggrStats get_aggr_stats_for(PartitionsStatsRequest request) request.getTblName(), request.getPartNames(), request.getColNames())); return aggrStats; } finally { - endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName()); + endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName()); } } @@ -5112,7 +5134,7 @@ public HiveMetastoreCli() { .withArgName("port") .withDescription("Hive Metastore port number, default:" + DEFAULT_HIVE_METASTORE_PORT) - .create('p')); + .create('p')); } @@ -5204,7 +5226,7 @@ public void run() { } catch (Throwable t) { // Catch the exception, log it and rethrow it. HMSHandler.LOG - .error("Metastore Thrift Server threw an exception...", t); + .error("Metastore Thrift Server threw an exception...", t); throw t; } } @@ -5229,7 +5251,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) * @throws Throwable */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - HiveConf conf) throws Throwable { + HiveConf conf) throws Throwable { startMetaStore(port, bridge, conf, null, null, null); } @@ -5259,64 +5281,64 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); - TProcessor processor; - TTransportFactory transFactory; - if (useSasl) { - // we are in secure mode. - if (useFramedTransport) { - throw new HiveMetaException("Framed transport is not supported with SASL enabled."); - } - saslServer = bridge.createServer( - conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), - conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); - // start delegation token manager - HMSHandler hmsHandler = new HMSHandler("new db based metaserver", conf); - saslServer.startDelegationTokenSecretManager(conf, hmsHandler); - transFactory = saslServer.createTransportFactory( + TProcessor processor; + TTransportFactory transFactory; + if (useSasl) { + // we are in secure mode. + if (useFramedTransport) { + throw new HiveMetaException("Framed transport is not supported with SASL enabled."); + } + saslServer = bridge.createServer( + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); + // start delegation token manager + HMSHandler hmsHandler = new HMSHandler("new db based metaserver", conf); + saslServer.startDelegationTokenSecretManager(conf, hmsHandler); + transFactory = saslServer.createTransportFactory( MetaStoreUtils.getMetaStoreSaslProperties(conf)); - processor = saslServer.wrapProcessor( - new ThriftHiveMetastore.Processor(hmsHandler)); - LOG.info("Starting DB backed MetaStore Server in Secure Mode"); - } else { - // we are in unsecure mode. - IHMSHandler handler = newHMSHandler("new db based metaserver", conf); + processor = saslServer.wrapProcessor( + new ThriftHiveMetastore.Processor(hmsHandler)); + LOG.info("Starting DB backed MetaStore Server in Secure Mode"); + } else { + // we are in unsecure mode. + IHMSHandler handler = newHMSHandler("new db based metaserver", conf); - if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) { - transFactory = useFramedTransport ? - new ChainedTTransportFactory(new TFramedTransport.Factory(), - new TUGIContainingTransport.Factory()) + if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) { + transFactory = useFramedTransport ? + new ChainedTTransportFactory(new TFramedTransport.Factory(), + new TUGIContainingTransport.Factory()) : new TUGIContainingTransport.Factory(); - processor = new TUGIBasedProcessor(handler); - LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); - } else { - transFactory = useFramedTransport ? - new TFramedTransport.Factory() : new TTransportFactory(); - processor = new TSetIpAddressProcessor(handler); - LOG.info("Starting DB backed MetaStore Server"); - } - } + processor = new TUGIBasedProcessor(handler); + LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); + } else { + transFactory = useFramedTransport ? + new TFramedTransport.Factory() : new TTransportFactory(); + processor = new TSetIpAddressProcessor(handler); + LOG.info("Starting DB backed MetaStore Server"); + } + } - TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport) + TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport) .processor(processor) .transportFactory(transFactory) .protocolFactory(new TBinaryProtocol.Factory()) .minWorkerThreads(minWorkerThreads) .maxWorkerThreads(maxWorkerThreads); - TServer tServer = new TThreadPoolServer(args); - HMSHandler.LOG.info("Started the new metaserver on port [" + port - + "]..."); - HMSHandler.LOG.info("Options.minWorkerThreads = " - + minWorkerThreads); - HMSHandler.LOG.info("Options.maxWorkerThreads = " - + maxWorkerThreads); - HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); - - if (startLock != null) { - signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); - } - tServer.serve(); + TServer tServer = new TThreadPoolServer(args); + HMSHandler.LOG.info("Started the new metaserver on port [" + port + + "]..."); + HMSHandler.LOG.info("Options.minWorkerThreads = " + + minWorkerThreads); + HMSHandler.LOG.info("Options.maxWorkerThreads = " + + maxWorkerThreads); + HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); + + if (startLock != null) { + signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); + } + tServer.serve(); } catch (Throwable x) { x.printStackTrace(); HMSHandler.LOG.error(StringUtils.stringifyException(x)); @@ -5325,8 +5347,8 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, } private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, - final Condition startCondition, - final MetaStoreThread.BooleanPointer startedServing) { + final Condition startCondition, + final MetaStoreThread.BooleanPointer startedServing) { // A simple thread to wait until the server has started and then signal the other threads to // begin Thread t = new Thread() { @@ -5356,8 +5378,8 @@ public void run() { * @param conf Hive configuration object */ private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock, - final Condition startCondition, final - MetaStoreThread.BooleanPointer startedServing) { + final Condition startCondition, final + MetaStoreThread.BooleanPointer startedServing) { // A thread is spun up to start these other threads. That's because we can't start them // until after the TServer has started, but once TServer.serve is called we aren't given back // control. @@ -5431,7 +5453,7 @@ private static MetaStoreThread instantiateThread(String classname) throws Except private static int nextThreadId = 1000000; private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws - MetaException { + MetaException { LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.setHiveConf(conf); thread.setThreadId(nextThreadId++); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 0693039..e8f16a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -252,6 +252,8 @@ private void initialize(Properties dsProps) { expressionProxy = createExpressionProxy(hiveConf); directSql = new MetaStoreDirectSql(pm); } + LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + + " created in the thread with id: " + Thread.currentThread().getId()); } /** @@ -343,6 +345,8 @@ public PersistenceManager getPersistenceManager() { @Override public void shutdown() { if (pm != null) { + LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + + " will be shutdown"); pm.close(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e387b8f..93c73ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -354,7 +354,7 @@ public void createTable(String tableName, List columns, public void createTable(String tableName, List columns, List partCols, Class fileInputFormat, Class fileOutputFormat, int bucketCount, List bucketCols) - throws HiveException { + throws HiveException { if (columns == null) { throw new HiveException("columns not specified for table " + tableName); } @@ -500,7 +500,7 @@ public void alterPartitions(String tblName, List newParts) throws InvalidOperationException, HiveException { String[] names = Utilities.getDbTableName(tblName); List newTParts = - new ArrayList(); + new ArrayList(); try { // Remove the DDL time so that it gets refreshed for (Partition tmpPart: newParts) { @@ -671,7 +671,7 @@ public void createIndex(String tableName, String indexName, String indexHandlerC Map idxProps, Map tblProps, Map serdeProps, String collItemDelim, String fieldDelim, String fieldEscape, String lineDelim, String mapKeyDelim, String indexComment) - throws HiveException { + throws HiveException { try { Index old_index = null; @@ -734,7 +734,7 @@ public void createIndex(String tableName, String indexName, String indexHandlerC if (serdeProps != null) { Iterator> iter = serdeProps.entrySet() - .iterator(); + .iterator(); while (iter.hasNext()) { Entry m = iter.next(); serdeInfo.getParameters().put(m.getKey(), m.getValue()); @@ -790,12 +790,12 @@ public void createIndex(String tableName, String indexName, String indexHandlerC SessionState ss = SessionState.get(); CreateTableAutomaticGrant grants; if (ss != null && ((grants = ss.getCreateTableGrants()) != null)) { - PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet(); - principalPrivs.setUserPrivileges(grants.getUserGrants()); - principalPrivs.setGroupPrivileges(grants.getGroupGrants()); - principalPrivs.setRolePrivileges(grants.getRoleGrants()); - tt.setPrivileges(principalPrivs); - } + PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet(); + principalPrivs.setUserPrivileges(grants.getUserGrants()); + principalPrivs.setGroupPrivileges(grants.getGroupGrants()); + principalPrivs.setRolePrivileges(grants.getRoleGrants()); + tt.setPrivileges(principalPrivs); + } } if(!deferredRebuild) { @@ -1008,9 +1008,9 @@ public Table getTable(final String dbName, final String tableName, // earlier version of Hive. if (org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.class .getName().equals( - tTable.getSd().getSerdeInfo().getSerializationLib()) - && tTable.getSd().getColsSize() > 0 - && tTable.getSd().getCols().get(0).getType().indexOf('<') == -1) { + tTable.getSd().getSerdeInfo().getSerializationLib()) + && tTable.getSd().getColsSize() > 0 + && tTable.getSd().getCols().get(0).getType().indexOf('<') == -1) { tTable.getSd().getSerdeInfo().setSerializationLib( org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); } @@ -1288,68 +1288,68 @@ public void loadPartition(Path loadPath, String tableName, } /** - * Walk through sub-directory tree to construct list bucketing location map. - * - * @param fSta - * @param fSys - * @param skewedColValueLocationMaps - * @param newPartPath - * @param skewedInfo - * @throws IOException - */ -private void walkDirTree(FileStatus fSta, FileSystem fSys, - Map, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo) - throws IOException { - /* Base Case. It's leaf. */ - if (!fSta.isDir()) { - /* construct one location map if not exists. */ - constructOneLBLocationMap(fSta, skewedColValueLocationMaps, newPartPath, skewedInfo); - return; - } + * Walk through sub-directory tree to construct list bucketing location map. + * + * @param fSta + * @param fSys + * @param skewedColValueLocationMaps + * @param newPartPath + * @param skewedInfo + * @throws IOException + */ + private void walkDirTree(FileStatus fSta, FileSystem fSys, + Map, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo) + throws IOException { + /* Base Case. It's leaf. */ + if (!fSta.isDir()) { + /* construct one location map if not exists. */ + constructOneLBLocationMap(fSta, skewedColValueLocationMaps, newPartPath, skewedInfo); + return; + } - /* dfs. */ - FileStatus[] children = fSys.listStatus(fSta.getPath()); - if (children != null) { - for (FileStatus child : children) { - walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo); + /* dfs. */ + FileStatus[] children = fSys.listStatus(fSta.getPath()); + if (children != null) { + for (FileStatus child : children) { + walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo); + } } } -} -/** - * Construct a list bucketing location map - * @param fSta - * @param skewedColValueLocationMaps - * @param newPartPath - * @param skewedInfo - */ -private void constructOneLBLocationMap(FileStatus fSta, - Map, String> skewedColValueLocationMaps, - Path newPartPath, SkewedInfo skewedInfo) { - Path lbdPath = fSta.getPath().getParent(); - List skewedValue = new ArrayList(); - String lbDirName = FileUtils.unescapePathName(lbdPath.toString()); - String partDirName = FileUtils.unescapePathName(newPartPath.toString()); - String lbDirSuffix = lbDirName.replace(partDirName, ""); - String[] dirNames = lbDirSuffix.split(Path.SEPARATOR); - for (String dirName : dirNames) { - if ((dirName != null) && (dirName.length() > 0)) { - // Construct skewed-value to location map except default directory. - // why? query logic knows default-dir structure and don't need to get from map + /** + * Construct a list bucketing location map + * @param fSta + * @param skewedColValueLocationMaps + * @param newPartPath + * @param skewedInfo + */ + private void constructOneLBLocationMap(FileStatus fSta, + Map, String> skewedColValueLocationMaps, + Path newPartPath, SkewedInfo skewedInfo) { + Path lbdPath = fSta.getPath().getParent(); + List skewedValue = new ArrayList(); + String lbDirName = FileUtils.unescapePathName(lbdPath.toString()); + String partDirName = FileUtils.unescapePathName(newPartPath.toString()); + String lbDirSuffix = lbDirName.replace(partDirName, ""); + String[] dirNames = lbDirSuffix.split(Path.SEPARATOR); + for (String dirName : dirNames) { + if ((dirName != null) && (dirName.length() > 0)) { + // Construct skewed-value to location map except default directory. + // why? query logic knows default-dir structure and don't need to get from map if (!dirName .equalsIgnoreCase(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME)) { - String[] kv = dirName.split("="); - if (kv.length == 2) { - skewedValue.add(kv[1]); + String[] kv = dirName.split("="); + if (kv.length == 2) { + skewedValue.add(kv[1]); + } } } } + if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size()) + && !skewedColValueLocationMaps.containsKey(skewedValue)) { + skewedColValueLocationMaps.put(skewedValue, lbdPath.toString()); + } } - if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size()) - && !skewedColValueLocationMaps.containsKey(skewedValue)) { - skewedColValueLocationMaps.put(skewedValue, lbdPath.toString()); - } -} /** * Construct location map from path @@ -1386,12 +1386,12 @@ private void constructOneLBLocationMap(FileStatus fSta, public ArrayList> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, int numDP, boolean holdDDLTime, boolean listBucketingEnabled) - throws HiveException { + throws HiveException { Set validPartitions = new HashSet(); try { ArrayList> fullPartSpecs = - new ArrayList>(); + new ArrayList>(); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs); @@ -1399,7 +1399,7 @@ private void constructOneLBLocationMap(FileStatus fSta, for (FileStatus s : leafStatus) { // Check if the hadoop version supports sub-directories for tables/partitions if (s.isDir() && - !conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES)) { + !conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES)) { // No leaves in this directory LOG.info("NOT moving empty directory: " + s.getPath()); } else { @@ -1547,40 +1547,40 @@ public Partition createPartition(Table tbl, Map partSpec) throws Table tbl, AddPartitionDesc.OnePartitionDesc addSpec) throws HiveException { Path location = addSpec.getLocation() != null ? new Path(tbl.getPath(), addSpec.getLocation()) : null; - if (location !=null && !Utilities.isDefaultNameNode(conf)) { - // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) - location = new Path(Utilities.getQualifiedPath(conf, location)); - } - org.apache.hadoop.hive.metastore.api.Partition part = - Partition.createMetaPartitionObject(tbl, addSpec.getPartSpec(), location); - if (addSpec.getPartParams() != null) { - part.setParameters(addSpec.getPartParams()); - } - if (addSpec.getInputFormat() != null) { - part.getSd().setInputFormat(addSpec.getInputFormat()); - } - if (addSpec.getOutputFormat() != null) { - part.getSd().setOutputFormat(addSpec.getOutputFormat()); - } - if (addSpec.getNumBuckets() != -1) { - part.getSd().setNumBuckets(addSpec.getNumBuckets()); - } - if (addSpec.getCols() != null) { - part.getSd().setCols(addSpec.getCols()); - } - if (addSpec.getSerializationLib() != null) { - part.getSd().getSerdeInfo().setSerializationLib(addSpec.getSerializationLib()); - } - if (addSpec.getSerdeParams() != null) { - part.getSd().getSerdeInfo().setParameters(addSpec.getSerdeParams()); - } - if (addSpec.getBucketCols() != null) { - part.getSd().setBucketCols(addSpec.getBucketCols()); - } - if (addSpec.getSortCols() != null) { - part.getSd().setSortCols(addSpec.getSortCols()); - } - return part; + if (location !=null && !Utilities.isDefaultNameNode(conf)) { + // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) + location = new Path(Utilities.getQualifiedPath(conf, location)); + } + org.apache.hadoop.hive.metastore.api.Partition part = + Partition.createMetaPartitionObject(tbl, addSpec.getPartSpec(), location); + if (addSpec.getPartParams() != null) { + part.setParameters(addSpec.getPartParams()); + } + if (addSpec.getInputFormat() != null) { + part.getSd().setInputFormat(addSpec.getInputFormat()); + } + if (addSpec.getOutputFormat() != null) { + part.getSd().setOutputFormat(addSpec.getOutputFormat()); + } + if (addSpec.getNumBuckets() != -1) { + part.getSd().setNumBuckets(addSpec.getNumBuckets()); + } + if (addSpec.getCols() != null) { + part.getSd().setCols(addSpec.getCols()); + } + if (addSpec.getSerializationLib() != null) { + part.getSd().getSerdeInfo().setSerializationLib(addSpec.getSerializationLib()); + } + if (addSpec.getSerdeParams() != null) { + part.getSd().getSerdeInfo().setParameters(addSpec.getSerdeParams()); + } + if (addSpec.getBucketCols() != null) { + part.getSd().setBucketCols(addSpec.getBucketCols()); + } + if (addSpec.getSortCols() != null) { + part.getSd().setSortCols(addSpec.getSortCols()); + } + return part; } public Partition getPartition(Table tbl, Map partSpec, @@ -1646,12 +1646,12 @@ public Partition getPartition(Table tbl, Map partSpec, if (forceCreate) { if (tpart == null) { LOG.debug("creating partition for table " + tbl.getTableName() - + " with partition spec : " + partSpec); + + " with partition spec : " + partSpec); tpart = getMSC().appendPartition(tbl.getDbName(), tbl.getTableName(), pvals); } else { LOG.debug("altering partition for table " + tbl.getTableName() - + " with partition spec : " + partSpec); + + " with partition spec : " + partSpec); if (inheritTableSpecs) { tpart.getSd().setOutputFormat(tbl.getTTable().getSd().getOutputFormat()); tpart.getSd().setInputFormat(tbl.getTTable().getSd().getInputFormat()); @@ -1830,7 +1830,7 @@ public boolean dropPartition(String db_name, String tbl_name, */ public List getPartitions(Table tbl, Map partialPartSpec, short limit) - throws HiveException { + throws HiveException { if (!tbl.isPartitioned()) { throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED, tbl.getTableName()); } @@ -1864,7 +1864,7 @@ public boolean dropPartition(String db_name, String tbl_name, * @throws HiveException */ public List getPartitions(Table tbl, Map partialPartSpec) - throws HiveException { + throws HiveException { return getPartitions(tbl, partialPartSpec, (short)-1); } @@ -1882,7 +1882,7 @@ public boolean dropPartition(String db_name, String tbl_name, */ public List getPartitionsByNames(Table tbl, Map partialPartSpec) - throws HiveException { + throws HiveException { if (!tbl.isPartitioned()) { throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED, tbl.getTableName()); @@ -1921,8 +1921,8 @@ public boolean dropPartition(String db_name, String tbl_name, try { for (int i = 0; i < nBatches; ++i) { List tParts = - getMSC().getPartitionsByNames(tbl.getDbName(), tbl.getTableName(), - partNames.subList(i*batchSize, (i+1)*batchSize)); + getMSC().getPartitionsByNames(tbl.getDbName(), tbl.getTableName(), + partNames.subList(i*batchSize, (i+1)*batchSize)); if (tParts != null) { for (org.apache.hadoop.hive.metastore.api.Partition tpart: tParts) { partitions.add(new Partition(tbl, tpart)); @@ -1932,8 +1932,8 @@ public boolean dropPartition(String db_name, String tbl_name, if (nParts > nBatches * batchSize) { List tParts = - getMSC().getPartitionsByNames(tbl.getDbName(), tbl.getTableName(), - partNames.subList(nBatches*batchSize, nParts)); + getMSC().getPartitionsByNames(tbl.getDbName(), tbl.getTableName(), + partNames.subList(nBatches*batchSize, nParts)); if (tParts != null) { for (org.apache.hadoop.hive.metastore.api.Partition tpart: tParts) { partitions.add(new Partition(tbl, tpart)); @@ -2105,7 +2105,7 @@ public boolean revokeRole(String roleName, String userName, public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, String db_name, String table_name, List part_values, String column_name, String user_name, List group_names) - throws HiveException { + throws HiveException { try { HiveObjectRef hiveObj = new HiveObjectRef(objectType, db_name, table_name, part_values, column_name); @@ -2143,7 +2143,7 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest private static List> checkPaths(HiveConf conf, FileSystem fs, FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace) - throws HiveException { + throws HiveException { List> result = new ArrayList>(); try { @@ -2181,7 +2181,7 @@ public boolean accept(Path p) { } if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES) && - item.isDir()) { + item.isDir()) { throw new HiveException("checkPaths: " + src.getPath() + " has nested directory" + itemSource); } @@ -2443,7 +2443,7 @@ public void exchangeTablePartitions(Map partitionSpecs, String destinationTableName) throws HiveException { try { getMSC().exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb, - destinationTableName); + destinationTableName); } catch (Exception ex) { LOG.error(StringUtils.stringifyException(ex)); throw new HiveException(ex); @@ -2461,29 +2461,29 @@ public void exchangeTablePartitions(Map partitionSpecs, private IMetaStoreClient createMetaStoreClient() throws MetaException { HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { - @Override - public HiveMetaHook getHook( + @Override + public HiveMetaHook getHook( org.apache.hadoop.hive.metastore.api.Table tbl) - throws MetaException { + throws MetaException { - try { - if (tbl == null) { - return null; - } - HiveStorageHandler storageHandler = + try { + if (tbl == null) { + return null; + } + HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, - tbl.getParameters().get(META_TABLE_STORAGE)); - if (storageHandler == null) { - return null; - } - return storageHandler.getMetaHook(); - } catch (HiveException ex) { - LOG.error(StringUtils.stringifyException(ex)); - throw new MetaException( - "Failed to load storage handler: " + ex.getMessage()); + tbl.getParameters().get(META_TABLE_STORAGE)); + if (storageHandler == null) { + return null; } + return storageHandler.getMetaHook(); + } catch (HiveException ex) { + LOG.error(StringUtils.stringifyException(ex)); + throw new MetaException( + "Failed to load storage handler: " + ex.getMessage()); } - }; + } + }; return RetryingMetaStoreClient.getProxy(conf, hookLoader, SessionHiveMetaStoreClient.class.getName()); } @@ -2576,7 +2576,7 @@ public boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) t public Map> getPartitionColumnStatistics(String dbName, String tableName, List partNames, List colNames) throws HiveException { - try { + try { return getMSC().getPartitionColumnStatistics(dbName, tableName, partNames, colNames); } catch (Exception e) { LOG.debug(StringUtils.stringifyException(e)); @@ -2595,7 +2595,7 @@ public AggrStats getAggrColStatsFor(String dbName, String tblName, } public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) - throws HiveException { + throws HiveException { try { return getMSC().deleteTableColumnStatistics(dbName, tableName, colName); } catch(Exception e) { @@ -2605,14 +2605,14 @@ public boolean deleteTableColumnStatistics(String dbName, String tableName, Stri } public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, - String colName) throws HiveException { - try { - return getMSC().deletePartitionColumnStatistics(dbName, tableName, partName, colName); - } catch(Exception e) { - LOG.debug(StringUtils.stringifyException(e)); - throw new HiveException(e); - } + String colName) throws HiveException { + try { + return getMSC().deletePartitionColumnStatistics(dbName, tableName, partName, colName); + } catch(Exception e) { + LOG.debug(StringUtils.stringifyException(e)); + throw new HiveException(e); } + } public Table newTable(String tableName) throws HiveException { String[] names = Utilities.getDbTableName(tableName); @@ -2620,7 +2620,7 @@ public Table newTable(String tableName) throws HiveException { } public String getDelegationToken(String owner, String renewer) - throws HiveException{ + throws HiveException{ try { return getMSC().getDelegationToken(owner, renewer); } catch(Exception e) { @@ -2630,7 +2630,7 @@ public String getDelegationToken(String owner, String renewer) } public void cancelDelegationToken(String tokenStrForm) - throws HiveException { + throws HiveException { try { getMSC().cancelDelegationToken(tokenStrForm); } catch(Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java index bc9254c..533ac1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hive.ql.processors; +import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX; +import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX; +import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX; +import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX; +import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME; +import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; import static org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.defaultNullString; -import static org.apache.hadoop.hive.conf.SystemVariables.*; - import java.util.Map; import java.util.Properties; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; @@ -43,6 +49,7 @@ public class SetProcessor implements CommandProcessor { private static final String prefix = "set: "; + private static final Log LOG = LogFactory.getLog(SetProcessor.class); public static boolean getBoolean(String value) { if (value.equals("on") || value.equals("true")) { @@ -140,7 +147,7 @@ public static int setVariable(String varname, String varvalue) throws Exception // returns non-null string for validation fail private static void setConf(String varname, String key, String varvalue, boolean register) - throws IllegalArgumentException { + throws IllegalArgumentException { HiveConf conf = SessionState.get().getConf(); String value = new VariableSubstitution().substitute(conf, varvalue); if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) { @@ -287,7 +294,7 @@ public CommandProcessorResponse run(String command) { } } -// create a Schema object containing the give column + // create a Schema object containing the give column private Schema getSchema() { Schema sch = new Schema(); FieldSchema tmpFieldSchema = new FieldSchema(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9798cf3..dc94e19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -344,13 +344,13 @@ public static SessionState start(SessionState startSs) { if (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) { startSs.hiveHist = new HiveHistoryImpl(startSs); }else { - //Hive history is disabled, create a no-op proxy + // Hive history is disabled, create a no-op proxy startSs.hiveHist = HiveHistoryProxyHandler.getNoOpHiveHistoryProxy(); } } if (startSs.getTmpOutputFile() == null) { - // set temp file containing results to be sent to HiveClient + // Set temp file containing results to be sent to HiveClient try { startSs.setTmpOutputFile(createTempFile(startSs.getConf())); } catch (IOException e) { @@ -361,7 +361,7 @@ public static SessionState start(SessionState startSs) { // Get the following out of the way when you start the session these take a // while and should be done when we start up. try { - //Hive object instance should be created with a copy of the conf object. If the conf is + // Hive object instance should be created with a copy of the conf object. If the conf is // shared with SessionState, other parts of the code might update the config, but // Hive.get(HiveConf) would not recognize the case when it needs refreshing Hive.get(new HiveConf(startSs.conf)).getMSC(); @@ -369,7 +369,7 @@ public static SessionState start(SessionState startSs) { FileSystem.get(startSs.conf); startSs.createSessionPaths(startSs.conf); } catch (Exception e) { - // catch-all due to some exec time dependencies on session state + // Catch-all due to some exec time dependencies on session state // that would cause ClassNoFoundException otherwise throw new RuntimeException(e); } @@ -452,13 +452,13 @@ private void createSessionPaths(Configuration conf) throws IOException { // local & non-local tmp location is configurable. however it is the same across // all external file systems hdfsSessionPath = - new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), - sessionId); + new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), + sessionId); createPath(conf, hdfsSessionPath, scratchDirPermission); conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString()); localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), - sessionId); + sessionId); createPath(conf, localSessionPath, scratchDirPermission); conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString()); hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX); @@ -473,7 +473,7 @@ private void createPath(Configuration conf, Path p, String perm) throws IOExcept if (!Utilities.createDirsWithPermission(conf, p, fsPermission)) { throw new IOException("Cannot create directory: " - + p.toString()); + + p.toString()); } // best effort to clean up if we don't shut down properly diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index d2cdfc1..9cd9c1e 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -67,7 +67,6 @@ private HiveConf hiveConf; private SessionManager sessionManager; - private IMetaStoreClient metastoreClient; private UserGroupInformation serviceUGI; private UserGroupInformation httpUGI; @@ -132,21 +131,23 @@ public synchronized void start() { } catch (IOException eIO) { throw new ServiceException("Error setting stage directories", eIO); } - + // Initialize and test a connection to the metastore + IMetaStoreClient metastoreClient = null; try { - // Initialize and test a connection to the metastore metastoreClient = new HiveMetaStoreClient(hiveConf); metastoreClient.getDatabases("default"); } catch (Exception e) { throw new ServiceException("Unable to connect to MetaStore!", e); } + finally { + if (metastoreClient != null) { + metastoreClient.close(); + } + } } @Override public synchronized void stop() { - if (metastoreClient != null) { - metastoreClient.close(); - } super.stop(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index de54ca1..bb6de8a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * SQLOperation. @@ -171,27 +172,23 @@ public void run() throws HiveSQLException { if (!shouldRunAsync()) { runInternal(opConfig); } else { + // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); - // current Hive object needs to be set in aysnc thread in case of remote metastore. - // The metastore client in Hive is associated with right user - final Hive sessionHive = getCurrentHive(); - // current UGI will get used by metastore when metsatore is in embedded mode - // so this needs to get passed to the new async thread + // ThreadLocal Hive object needs to be set in background thread. + // The metastore client in Hive is associated with right user. + final Hive parentHive = getSessionHive(); + // Current UGI will get used by metastore when metsatore is in embedded mode + // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); - // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { - @Override public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - - // Storing the current Hive object necessary when doAs is enabled - // User information is part of the metastore client member in Hive - Hive.set(sessionHive); + Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); try { runInternal(opConfig); @@ -202,12 +199,25 @@ public Object run() throws HiveSQLException { return null; } }; + try { ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } + } } }; try { @@ -223,6 +233,12 @@ public Object run() throws HiveSQLException { } } + /** + * Returns the current UGI on the stack + * @param opConfig + * @return UserGroupInformation + * @throws HiveSQLException + */ private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { try { return ShimLoader.getHadoopShims().getUGIForConf(opConfig); @@ -231,11 +247,16 @@ private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLExce } } - private Hive getCurrentHive() throws HiveSQLException { + /** + * Returns the ThreadLocal Hive for the current thread + * @return Hive + * @throws HiveSQLException + */ + private Hive getSessionHive() throws HiveSQLException { try { return Hive.get(); } catch (HiveException e) { - throw new HiveSQLException("Failed to get current Hive object", e); + throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); } } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index bc0a02c..a5f5f5e 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -62,6 +62,7 @@ import org.apache.hive.service.cli.operation.MetadataOperation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * HiveSession @@ -95,14 +96,19 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; - // set an explicit session name to control the download directory name + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); - // use thrift transportable formatter + // Use thrift transportable formatter hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, FetchFormatter.ThriftFormatter.class.getName()); hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue()); + /** + * Create a new SessionState object that will be associated with this HiveServer2 session. + * When the server executes multiple queries in the same session, + * this SessionState object is reused across multiple queries. + */ sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); @@ -111,11 +117,9 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo @Override public void initialize(Map sessionConfMap) throws Exception { - //process global init file: .hiverc + // Process global init file: .hiverc processGlobalInitFile(); - SessionState.setCurrentSessionState(sessionState); - - //set conf properties specified by user from client side + // Set conf properties specified by user from client side if (sessionConfMap != null) { configureSession(sessionConfMap); } @@ -169,6 +173,7 @@ private void processGlobalInitFile() { } private void configureSession(Map sessionConfMap) throws Exception { + SessionState.setCurrentSessionState(sessionState); for (Map.Entry entry : sessionConfMap.entrySet()) { String key = entry.getKey(); if (key.startsWith("set:")) { @@ -211,14 +216,26 @@ public void open() { } protected synchronized void acquire() throws HiveSQLException { - // need to make sure that the this connections session state is - // stored in the thread local for sessions. + // Need to make sure that the this HiveServer2's session's session state is + // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); } + /** + * 1. We'll remove the ThreadLocal SessionState as this thread might now serve + * other requests. + * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ protected synchronized void release() { assert sessionState != null; SessionState.detachSession(); + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } } @Override @@ -468,7 +485,7 @@ public void close() throws HiveSQLException { try { acquire(); /** - * For metadata operations like getTables(), getColumns() etc, + * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be * closed at the end of the session */ diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index d573592..954fb19 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -38,6 +38,7 @@ import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; /** * SessionManager. @@ -64,22 +65,27 @@ public synchronized void init(HiveConf hiveConf) { } catch (HiveException e) { throw new RuntimeException("Error applying authorization policy on hive configuration", e); } - this.hiveConf = hiveConf; + createBackgroundOperationPool(); + addService(operationManager); + super.init(hiveConf); + } + + private void createBackgroundOperationPool() { int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize); + LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize); + LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime); + LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); // Create a thread pool with #backgroundPoolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + String threadPoolName = "HiveServer2-Background-Pool"; backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); - addService(operationManager); - super.init(hiveConf); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 37b05fc..e5ce72f 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -19,12 +19,17 @@ package org.apache.hive.service.cli.thrift; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TThreadPoolServer; @@ -65,6 +70,11 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME); + String threadPoolName = "HiveServer2-Handler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); TServerSocket serverSocket = null; if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { @@ -84,8 +94,7 @@ public void run() { .processorFactory(processorFactory) .transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) - .minWorkerThreads(minWorkerThreads) - .maxWorkerThreads(maxWorkerThreads); + .executorService(executorService); server = new TThreadPoolServer(sargs); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index be2eb01..a3a0867 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -71,6 +71,7 @@ protected int minWorkerThreads; protected int maxWorkerThreads; + protected int workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index c380b69..21d1563 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -18,6 +18,11 @@ package org.apache.hive.service.cli.thrift; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; @@ -26,6 +31,7 @@ import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; @@ -36,7 +42,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class ThriftHttpCLIService extends ThriftCLIService { @@ -63,13 +69,17 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer = new org.eclipse.jetty.server.Server(); - QueuedThreadPool threadPool = new QueuedThreadPool(); - threadPool.setMinThreads(minWorkerThreads); - threadPool.setMaxThreads(maxWorkerThreads); + String threadPoolName = "HiveServer2-HttpHandler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); + + ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); SelectChannelConnector connector = new SelectChannelConnector();; diff --git a/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java b/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java new file mode 100644 index 0000000..ec19abc --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.server; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.hive.metastore.RawStore; + +/** + * A ThreadFactory for constructing new HiveServer2 threads that lets you plug + * in custom cleanup code to be called before this thread is GC-ed. + * Currently cleans up the following: + * 1. ThreadLocal RawStore object: + * In case of an embedded metastore, HiveServer2 threads (foreground & background) + * end up caching a ThreadLocal RawStore object. The ThreadLocal RawStore object has + * an instance of PersistenceManagerFactory & PersistenceManager. + * The PersistenceManagerFactory keeps a cache of PersistenceManager objects, + * which are only removed when PersistenceManager#close method is called. + * HiveServer2 uses ExecutorService for managing thread pools for foreground & background threads. + * ExecutorService unfortunately does not provide any hooks to be called, + * when a thread from the pool is terminated. + * As a solution, we're using this ThreadFactory to keep a cache of RawStore objects per thread. + * And we are doing clean shutdown in the finalizer for each thread. + */ +public class ThreadFactoryWithGarbageCleanup implements ThreadFactory { + + private static Map threadRawStoreMap = new HashMap(); + + private final String namePrefix; + + public ThreadFactoryWithGarbageCleanup(String threadPoolName) { + namePrefix = threadPoolName; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread newThread = new ThreadWithGarbageCleanup(runnable); + newThread.setName(namePrefix + ": Thread-" + newThread.getId()); + return newThread; + } + + public static Map getThreadRawStoreMap() { + return threadRawStoreMap; + } +} diff --git a/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java b/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java new file mode 100644 index 0000000..8ee9810 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.server; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.RawStore; + +/** + * A HiveServer2 thread used to construct new server threads. + * In particular, this thread ensures an orderly cleanup, + * when killed by its corresponding ExecutorService. + */ +public class ThreadWithGarbageCleanup extends Thread { + private static final Log LOG = LogFactory.getLog(ThreadWithGarbageCleanup.class); + + Map threadRawStoreMap = + ThreadFactoryWithGarbageCleanup.getThreadRawStoreMap(); + + public ThreadWithGarbageCleanup(Runnable runnable) { + super(runnable); + } + + /** + * Add any Thread specific garbage cleanup code here. + * Currently, it shuts down the RawStore object for this thread if it is not null. + */ + @Override + public void finalize() throws Throwable { + cleanRawStore(); + super.finalize(); + } + + private void cleanRawStore() { + Long threadId = this.getId(); + RawStore threadLocalRawStore = threadRawStoreMap.get(threadId); + if (threadLocalRawStore != null) { + LOG.debug("RawStore: " + threadLocalRawStore + ", for the thread: " + + this.getName() + " will be closed now."); + threadLocalRawStore.shutdown(); + threadRawStoreMap.remove(threadId); + } + } + + /** + * Cache the ThreadLocal RawStore object. Called from the corresponding thread. + */ + public void cacheThreadLocalRawStore() { + Long threadId = this.getId(); + RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); + if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) { + LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " + + this.getName() + " to threadRawStoreMap for future cleanup."); + threadRawStoreMap.put(threadId, threadLocalRawStore); + } + } +} diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 66fc1fc..2ef2d43 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -27,7 +27,11 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.ICLIService; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; import org.junit.After; @@ -58,6 +62,7 @@ public ICLIService getService() { } } + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -66,7 +71,7 @@ public void setUp() throws Exception { initFile = File.createTempFile("test", "hive"); tmpDir = initFile.getParentFile().getAbsoluteFile() + File.separator - + "TestSessionGlobalInitFile"; + + "TestSessionGlobalInitFile"; initFile.delete(); FileUtils.deleteDirectory(new File(tmpDir)); @@ -77,8 +82,8 @@ public void setUp() throws Exception { String[] fileContent = new String[] { "-- global init hive file for test", "set a=1;", - "set hiveconf:b=1;", "set hivevar:c=1;", "set d\\", " =1;", - "add jar " + initFile.getAbsolutePath() }; + "set hiveconf:b=1;", "set hivevar:c=1;", "set d\\", " =1;", + "add jar " + initFile.getAbsolutePath() }; FileUtils.writeLines(initFile, Arrays.asList(fileContent)); // set up service and client @@ -90,6 +95,7 @@ public void setUp() throws Exception { client = new ThriftCLIServiceClient(service); } + @Override @After public void tearDown() throws Exception { // restore