diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f24bcbd8a2..487208054a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -111,14 +111,15 @@ public String getName() { @Override protected int execute(DriverContext driverContext) { try { + Hive hiveDb = getHive(); Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; if (work.isBootStrapDump()) { - lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot); + lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb); } else { - lastReplId = incrementalDump(dumpRoot, dmd, cmRoot); + lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb); } prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); } catch (RuntimeException e) { @@ -141,7 +142,7 @@ private void prepareReturnValues(List values, String schema) throws Sema Utils.writeOutput(values, new Path(work.resultTempPath), conf); } - private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { Long lastReplId;// get list of events matching dbPattern & tblPattern // go through each event, and dump out each event to a event-level dump dir inside dumproot @@ -151,7 +152,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw // same factory, restricting by message format is effectively a guard against // older leftover data that would cause us problems. - work.overrideEventTo(getHive()); + work.overrideEventTo(hiveDb); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), @@ -159,7 +160,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(getHive()); + = new EventUtils.MSClientNotificationFetcher(hiveDb); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); @@ -175,7 +176,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot); + dumpEvent(ev, evRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -193,11 +194,11 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw return lastReplId; } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, cmRoot, - getHive(), + db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), work.dbNameOrPattern, @@ -216,12 +217,11 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { return rspec; } - Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case // Last repl id would've been captured during compile phase in queryState configs before opening txn. // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. - Hive hiveDb = getHive(); Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); @@ -229,11 +229,11 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exceptio for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(getHive(), dbName).size(), - getHive().getAllFunctions().size()); + Utils.getAllTables(hiveDb, dbName).size(), + hiveDb.getAllFunctions().size()); replLogger.startLog(); - Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId); - dumpFunctionMetadata(dbName, dumpRoot); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); + dumpFunctionMetadata(dbName, dumpRoot, hiveDb); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; @@ -241,8 +241,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exceptio for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId); - dumpConstraintMetadata(dbName, tblName, dbRoot); + dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb); + dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); } } catch (Exception e) { caught = e; @@ -279,20 +279,19 @@ long currentNotificationId(Hive hiveDb) throws TException { return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception { + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception { Path dbRoot = new Path(dumpRoot, dbName); // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - HiveWrapper.Tuple database = new HiveWrapper(getHive(), dbName, lastReplId).database(); + HiveWrapper.Tuple database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception { + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception { try { - Hive db = getHive(); - HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); + HiveWrapper.Tuple
tuple = new HiveWrapper(hiveDb, dbName).table(tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); @@ -308,7 +307,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); @@ -406,11 +405,11 @@ private String getNextDumpDir() { } } - void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); - List functionNames = getHive().getFunctions(dbName, "*"); + List functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { - HiveWrapper.Tuple tuple = functionTuple(functionName, dbName); + HiveWrapper.Tuple tuple = functionTuple(functionName, dbName, hiveDb); if (tuple == null) { continue; } @@ -425,16 +424,15 @@ void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { } } - void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { try { Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); - Hive db = getHive(); - List pks = db.getPrimaryKeyList(dbName, tblName); - List fks = db.getForeignKeyList(dbName, tblName); - List uks = db.getUniqueConstraintList(dbName, tblName); - List nns = db.getNotNullConstraintList(dbName, tblName); + List pks = hiveDb.getPrimaryKeyList(dbName, tblName); + List fks = hiveDb.getForeignKeyList(dbName, tblName); + List uks = hiveDb.getUniqueConstraintList(dbName, tblName); + List nns = hiveDb.getNotNullConstraintList(dbName, tblName); if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty()) || (nns != null && !nns.isEmpty())) { try (JsonWriter jsonWriter = @@ -457,9 +455,9 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws E } } - private HiveWrapper.Tuple functionTuple(String functionName, String dbName) { + private HiveWrapper.Tuple functionTuple(String functionName, String dbName, Hive hiveDb) { try { - HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); + HiveWrapper.Tuple tuple = new HiveWrapper(hiveDb, dbName).function(functionName); if (tuple.object.getResourceUris().isEmpty()) { LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 696227be48..7d11cac382 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -76,6 +76,7 @@ import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.collect.ImmutableList; @@ -142,7 +143,9 @@ public void init(Hive db) { LOG.info("Using dummy materialized views registry"); } else { // We initialize the cache - ExecutorService pool = Executors.newCachedThreadPool(); + ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HiveMaterializedViewsRegistry-%d") + .build()); pool.submit(new Loader(db)); pool.shutdown(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index d5956ddb18..d838956a07 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -79,16 +79,16 @@ String getValidTxnListForReplDump(Hive hiveDb) { } @Override - void dumpFunctionMetadata(String dbName, Path dumpRoot) { + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) { } @Override - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) { + Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) { return Mockito.mock(Path.class); } @Override - void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) { + void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) { } } @@ -116,7 +116,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { @@ -133,7 +133,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, ); try { - task.bootStrapDump(mock(Path.class), null, mock(Path.class)); + task.bootStrapDump(mock(Path.class), null, mock(Path.class), same(hive)); } finally { verifyStatic(); Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey)); diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 36df57e40c..19286a607c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -320,6 +320,9 @@ public Object run() throws HiveSQLException { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { + // Call Hive.closeCurrent() that closes the HMS connection, causes + // HMS connection leaks otherwise. + Hive.closeCurrent(); LogUtils.unregisterLoggingContext(); } return null; diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index df6d56b679..d226db50a5 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.ObjectPair; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.metastore.utils.LogUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -527,6 +528,10 @@ private void open() throws MetaException { transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword ); LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet()); + if (LOG.isTraceEnabled()) { + LOG.trace("", new LogUtils.StackTraceLogger("METASTORE SSL CONNECTION TRACE - open - " + + System.identityHashCode(this))); + } } catch(IOException e) { throw new IllegalArgumentException(e); } catch(TTransportException e) { @@ -587,6 +592,10 @@ private void open() throws MetaException { if (!transport.isOpen()) { transport.open(); LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet()); + if (LOG.isTraceEnabled()) { + LOG.trace("", new LogUtils.StackTraceLogger("METASTORE CONNECTION TRACE - open - " + + System.identityHashCode(this))); + } } isConnected = true; } catch (TTransportException e) { @@ -670,6 +679,10 @@ public void close() { if ((transport != null) && transport.isOpen()) { transport.close(); LOG.info("Closed a connection to metastore, current connections: " + connCount.decrementAndGet()); + if (LOG.isTraceEnabled()) { + LOG.trace("", new LogUtils.StackTraceLogger("METASTORE CONNECTION TRACE - close - " + + System.identityHashCode(this))); + } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java similarity index 96% rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java index 26daeaed56..c93da380e3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/LogUtils.java @@ -46,6 +46,15 @@ } } + /** + * This is an exception that can be passed to logger just for printing the stacktrace. + */ + public static class StackTraceLogger extends Exception { + public StackTraceLogger(final String msg) { + super(msg); + } + } + /** * Initialize log4j. *