diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 2804514..b4ae1d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -54,11 +54,11 @@ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set locks; - private IMetaStoreClient client; + private DbTxnManager.SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(IMetaStoreClient client, HiveConf conf) { + DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 4539e71..ad1a46e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -62,7 +62,7 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private DbLockManager lockMgr = null; - private IMetaStoreClient client = null; + private SynchronizedMetaStoreClient client = null; /** * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available * transaction id. Thus is 1 is first transaction id. @@ -520,7 +520,7 @@ private void init() throws LockException { } try { Hive db = Hive.get(conf); - client = db.getMSC(); + client = new SynchronizedMetaStoreClient(db.getMSC()); initHeartbeatExecutorService(); } catch (MetaException e) { throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); @@ -615,4 +615,54 @@ public void run() { } } } + + /** + * Synchronized MetaStoreClient wrapper + */ + final class SynchronizedMetaStoreClient { + private IMetaStoreClient client = null; + SynchronizedMetaStoreClient(IMetaStoreClient client) { + this.client = client; + } + + IMetaStoreClient getClient() { + return client; + } + + synchronized long openTxn(String user) throws TException { + return client.openTxn(user); + } + + synchronized void commitTxn(long txnid) throws TException { + client.commitTxn(txnid); + } + + synchronized void rollbackTxn(long txnid) throws TException { + client.rollbackTxn(txnid); + } + + synchronized void heartbeat(long txnid, long lockid) throws TException { + client.heartbeat(txnid, lockid); + } + + synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { + return client.getValidTxns(currentTxn); + } + + synchronized LockResponse lock(LockRequest request) throws TException { + return client.lock(request); + } + + synchronized LockResponse checkLock(long lockid) throws TException { + return client.checkLock(lockid); + } + + synchronized void unlock(long lockid) throws TException { + client.unlock(lockid); + } + + synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { + return client.showLocks(showLocksRequest); + } + } }