diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 255c65aa73..ac276d83a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2807,6 +2807,7 @@ private void releaseDriverContext() { lDrvState.stateLock.lock(); try { if (driverCxt != null) { + driverCxt.destroyReplTxnManager(); driverCxt.shutdown(); driverCxt = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index d5392ab804..184d5bb9c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.NodeUtils; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; @@ -36,6 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -65,6 +69,8 @@ final Map statsTasks = new HashMap<>(1); + private HiveTxnManager replTxnManager = null; + public DriverContext() { } @@ -234,4 +240,24 @@ public void apply(FileSinkOperator fsOp) { } } } + + // Create a transaction manager for replication flow. In case user has provided a different config for REPL load, + // this transaction manager will be used to do the transaction operation. This is specifically useful when REPL load + // is called with different hive metastore uri expecting the transaction information to be stored in the specified + // metastore. + public synchronized HiveTxnManager getReplTxnManager(HiveConf hiveConf) throws LockException { + if (replTxnManager == null) { + replTxnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + LOG.debug("Created a new replTxnManager : " + replTxnManager); + } + return replTxnManager; + } + + public synchronized void destroyReplTxnManager() { + if (replTxnManager != null) { + replTxnManager.closeTxnManager(); + LOG.debug("Destroyed current replTxnManager : " + replTxnManager); + replTxnManager = null; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index c91b78e753..3fef9f5915 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -85,7 +85,7 @@ public int execute(DriverContext driverContext) { } try { - HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager(); + HiveTxnManager txnManager = driverContext.getReplTxnManager(conf); String user = UserGroupInformation.getCurrentUser().getUserName(); switch(work.getOperationType()) { case REPL_OPEN_TXN: