Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
None
Description
We will have an issue when oozie trying to acquire a lock and at the same time, some other thread is releasing the same lock .
acquireLock will wait for 5 sec to acquire the lock. It will bypass the synchronized block and get lockEntry from the hashmap.
While it waiting for 5 sec to acquire the lock, other thread releases the lock and may execute the release code which will remove lockEntry from the map.
If some other command from same thread tries to acquire the lock, it will create a new InterProcessReadWriteLock object and use that for acquiring the lock.
Logic for lock acquiring.
public LockToken getWriteLock(String resource, long wait) throws InterruptedException { InterProcessReadWriteLock lockEntry; synchronized (zkLocks) { if (zkLocks.containsKey(resource)) { lockEntry = zkLocks.get(resource); } else { lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); zkLocks.put(resource, lockEntry); } } InterProcessMutex writeLock = lockEntry.writeLock(); return acquireLock(wait, writeLock, resource); }
Logic for lock releasing
public void release() { try { lock.release(); if (zkLocks.get(resource) == null) { return; } if (!isLockHeld()) { synchronized (zkLocks) { if (zkLocks.get(resource) != null) { if (!isLockHeld()) { zkLocks.remove(resource); } } } } } catch (Exception ex) { LOG.warn("Could not release lock: " + ex.getMessage(), ex); } }
Curator code to acquire lock.
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
The approach we have followed is to use map with weakvalue. Once the lock is unreachable. GC will remove it from the map. We don't have to explicitly remove it.
Attachments
Attachments
Issue Links
- is broken by
-
OOZIE-2394 Oozie can execute command without holding lock
- Closed
-
OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released
- Closed
- links to