Thanks for patch v2. This turns out to be a bit more tricky.
1. First of all, instead of using "leaderIsrReadLock synchronized", we should do "leaderIsrReadLock.lock()".
2. Second, we should use a fair readWriteLock. Otherwise, some threads may be indefinitely postponed.
3. Third, from java doc, ReentrantReadWriteLock doesn't support upgrading from read lock to write loc.
" Lock downgrading
Reentrancy also allows downgrading from the write lock to a read lock, by acquiring the write lock, then the read lock and then releasing the write lock. However, upgrading from a read lock to the write lock is not possible. "
This means that if we need to call updateIsr(), we have to first release the read lock and require the read lock again when done. See the following example. However, this means that we are still vulnerable to the issue in maybeIncrementLeaderHW() (kafka-862). We probably can change the logic in maybeIncrementLeaderHW() so that it can handle empty set. We will need to think a bit more how to write the logic in a clean way.
Another possibility is to just take v1 patch. All producers to the same log will sync on the leaderIsrUpdateLock. In log.append(), the only code outside the log lock are analyzeAndValidateMessageSet() and maybeFlush(). The former is cheap since it does shallow iteration. The latter re-requires the log lock if flush if needed.