Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1535175) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -872,7 +872,7 @@ public MultiVersionConsistencyControl getMVCC() { return mvcc; } - + /* * Returns readpoint considering given IsolationLevel */ @@ -1614,7 +1614,7 @@ // Record latest flush time this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - + // Update the last flushed sequence id for region if (this.rsServices != null) { completeSequenceId = flushSeqId; @@ -2077,7 +2077,7 @@ lastIndexExclusive++; continue; } - + // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; @@ -2158,8 +2158,8 @@ // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2231,7 +2231,7 @@ locked = false; } releaseRowLocks(acquiredRowLocks); - + // ------------------------- // STEP 7. Sync wal. // ------------------------- @@ -2241,8 +2241,8 @@ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -3172,7 +3172,7 @@ } } } - + // allocate new lock for this thread return rowLockContext.newLock(); } finally { @@ -4641,7 +4641,7 @@ Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); - + Collections.sort(family.getValue(), store.getComparator()); // Get previous values for all columns in this family Get get = new Get(row); @@ -4650,10 +4650,10 @@ get.addColumn(family.getKey(), kv.getQualifier()); } List results = get(get, false); - + // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the append value - + // Avoid as much copying as possible. Every byte is copied at most // once. // Would be nice if KeyValue had scatter/gather logic @@ -4696,10 +4696,10 @@ System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); - + newKV.setMvccVersion(w.getWriteNumber()); kvs.add(newKV); - + // Append update to WAL if (writeToWAL) { if (walEdits == null) { @@ -4708,11 +4708,11 @@ walEdits.add(newKV); } } - + //store the kvs to the temporary memstore before writing HLog tempMemstore.put(store, kvs); } - + // Actually write to WAL now if (writeToWAL) { // Using default cluster id, as this can only happen in the orginating @@ -4724,7 +4724,7 @@ } else { recordMutationWithoutWal(append.getFamilyCellMap()); } - + //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); @@ -4816,7 +4816,7 @@ Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); - + // Get previous values for all columns in this family Get get = new Get(row); for (Cell cell: family.getValue()) { @@ -4825,7 +4825,7 @@ } get.setTimeRange(tr.getMin(), tr.getMax()); List results = get(get, false); - + // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the increment amount int idx = 0; @@ -4842,13 +4842,13 @@ } idx++; } - + // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount)); newKV.setMvccVersion(w.getWriteNumber()); kvs.add(newKV); - + // Prepare WAL updates if (writeToWAL) { if (walEdits == null) { @@ -4857,11 +4857,11 @@ walEdits.add(newKV); } } - + //store the kvs to the temporary memstore before writing HLog tempMemstore.put(store, kvs); } - + // Actually write to WAL now if (writeToWAL) { // Using default cluster id, as this can only happen in the orginating @@ -5546,7 +5546,7 @@ */ void failedBulkLoad(byte[] family, String srcPath) throws IOException; } - + @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); @@ -5557,16 +5557,16 @@ this.row = row; this.thread = Thread.currentThread(); } - + boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } - + RowLock newLock() { lockCount++; return new RowLock(this); } - + void releaseLock() { if (!ownedByCurrentThread()) { throw new IllegalArgumentException("Lock held by thread: " + thread @@ -5584,7 +5584,7 @@ } } } - + /** * Row lock held by a given thread. * One thread may acquire multiple locks on the same row simultaneously. @@ -5593,11 +5593,11 @@ public class RowLock { @VisibleForTesting final RowLockContext context; private boolean released = false; - + @VisibleForTesting RowLock(RowLockContext context) { this.context = context; } - + /** * Release the given lock. If there are no remaining locks held by the current thread * then unlock the row and allow other threads to acquire the lock. @@ -5610,4 +5610,21 @@ } } } + + /** + * Lock the updates' readLock first, so that we could safely append logs in coprocessors. + * @throws RegionTooBusyException + * @throws InterruptedIOException + */ + public void updatesLock() throws RegionTooBusyException, InterruptedIOException { + lock(updatesLock.readLock()); + } + + /** + * Unlock the updates' readLock after appending logs in coprocessors. + * @throws InterruptedIOException + */ + public void updatesUnlock() throws InterruptedIOException { + updatesLock.readLock().unlock(); + } }