diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 255c65aa73..ac276d83a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2807,6 +2807,7 @@ private void releaseDriverContext() { lDrvState.stateLock.lock(); try { if (driverCxt != null) { + driverCxt.destroyReplTxnManager(); driverCxt.shutdown(); driverCxt = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index d5392ab804..184d5bb9c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.NodeUtils; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; @@ -36,6 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -65,6 +69,8 @@ final Map statsTasks = new HashMap<>(1); + private HiveTxnManager replTxnManager = null; + public DriverContext() { } @@ -234,4 +240,24 @@ public void apply(FileSinkOperator fsOp) { } } } + + // Create a transaction manager for replication flow. In case user has provided a different config for REPL load, + // this transaction manager will be used to do the transaction operation. This is specifically useful when REPL load + // is called with different hive metastore uri expecting the transaction information to be stored in the specified + // metastore. + public synchronized HiveTxnManager getReplTxnManager(HiveConf hiveConf) throws LockException { + if (replTxnManager == null) { + replTxnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + LOG.debug("Created a new replTxnManager : " + replTxnManager); + } + return replTxnManager; + } + + public synchronized void destroyReplTxnManager() { + if (replTxnManager != null) { + replTxnManager.closeTxnManager(); + LOG.debug("Destroyed current replTxnManager : " + replTxnManager); + replTxnManager = null; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index c91b78e753..3fef9f5915 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -85,7 +85,7 @@ public int execute(DriverContext driverContext) { } try { - HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager(); + HiveTxnManager txnManager = driverContext.getReplTxnManager(conf); String user = UserGroupInformation.getCurrentUser().getUserName(); switch(work.getOperationType()) { case REPL_OPEN_TXN: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fad96f4d6b..f454420327 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -120,6 +120,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.TableSnapshot; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; @@ -3007,13 +3008,18 @@ public Partition createPartition(Table tbl, Map partSpec) throws List in = new ArrayList(size); long writeId; + Long replWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf); String validWriteIdList; // In case of replication, get the writeId from the source and use valid write Id list // for replication. if (addPartitionDesc.getReplicationSpec().isInReplicationScope() && - addPartitionDesc.getPartition(0).getWriteId() > 0) { + (addPartitionDesc.getPartition(0).getWriteId() > 0 || (replWriteId != null && replWriteId > 0))) { writeId = addPartitionDesc.getPartition(0).getWriteId(); + if (writeId <= 0) { + writeId = replWriteId; + } + // We need a valid writeId list for a transactional change. During replication we do not // have a valid writeId list which was used for this on the source. But we know for sure // that the writeId associated with it was valid then (otherwise the change would have