diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 05a7b01..6264ea3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -66,7 +66,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -114,12 +113,9 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1607,7 +1603,7 @@ public class HRegion implements HeapSize { // , Writable{ // Record latest flush time this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - + // Update the last flushed sequence id for region if (this.rsServices != null) { completeSequenceId = flushSeqId; @@ -1927,6 +1923,16 @@ public class HRegion implements HeapSize { // , Writable{ boolean initialized = false; + for (Mutation mutation : mutations) { + try { + checkRow(mutation.getRow(), "batchMutate"); + } catch (WrongRegionException ex) { + // TODO: this is temporary logging + LOG.warn("Wrong region is caught: " + " region:" + this.getRegionInfo() + " mutations:" + Arrays.toString(mutations)); + throw ex; + } + } + while (!batchOp.isDone()) { if (!isReplay) { checkReadOnly(); @@ -2070,7 +2076,7 @@ public class HRegion implements HeapSize { // , Writable{ lastIndexExclusive++; continue; } - + // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; @@ -2151,8 +2157,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2224,7 +2230,7 @@ public class HRegion implements HeapSize { // , Writable{ locked = false; } releaseRowLocks(acquiredRowLocks); - + // ------------------------- // STEP 7. Sync wal. // ------------------------- @@ -2234,8 +2240,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -3158,7 +3164,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + // allocate new lock for this thread return rowLockContext.newLock(); } finally { @@ -4639,7 +4645,7 @@ public class HRegion implements HeapSize { // , Writable{ Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); - + Collections.sort(family.getValue(), store.getComparator()); // Get previous values for all columns in this family Get get = new Get(row); @@ -4648,10 +4654,10 @@ public class HRegion implements HeapSize { // , Writable{ get.addColumn(family.getKey(), kv.getQualifier()); } List results = get(get, false); - + // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the append value - + // Avoid as much copying as possible. Every byte is copied at most // once. // Would be nice if KeyValue had scatter/gather logic @@ -4694,10 +4700,10 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); - + newKV.setMvccVersion(w.getWriteNumber()); kvs.add(newKV); - + // Append update to WAL if (writeToWAL) { if (walEdits == null) { @@ -4706,11 +4712,11 @@ public class HRegion implements HeapSize { // , Writable{ walEdits.add(newKV); } } - + //store the kvs to the temporary memstore before writing HLog tempMemstore.put(store, kvs); } - + // Actually write to WAL now if (writeToWAL) { // Using default cluster id, as this can only happen in the orginating @@ -4722,7 +4728,7 @@ public class HRegion implements HeapSize { // , Writable{ } else { recordMutationWithoutWal(append.getFamilyCellMap()); } - + //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); @@ -4814,7 +4820,7 @@ public class HRegion implements HeapSize { // , Writable{ Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); - + // Get previous values for all columns in this family Get get = new Get(row); for (Cell cell: family.getValue()) { @@ -4823,7 +4829,7 @@ public class HRegion implements HeapSize { // , Writable{ } get.setTimeRange(tr.getMin(), tr.getMax()); List results = get(get, false); - + // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the increment amount int idx = 0; @@ -4840,13 +4846,13 @@ public class HRegion implements HeapSize { // , Writable{ } idx++; } - + // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount)); newKV.setMvccVersion(w.getWriteNumber()); kvs.add(newKV); - + // Prepare WAL updates if (writeToWAL) { if (walEdits == null) { @@ -4855,11 +4861,11 @@ public class HRegion implements HeapSize { // , Writable{ walEdits.add(newKV); } } - + //store the kvs to the temporary memstore before writing HLog tempMemstore.put(store, kvs); } - + // Actually write to WAL now if (writeToWAL) { // Using default cluster id, as this can only happen in the orginating @@ -5544,7 +5550,7 @@ public class HRegion implements HeapSize { // , Writable{ */ void failedBulkLoad(byte[] family, String srcPath) throws IOException; } - + @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); @@ -5555,16 +5561,16 @@ public class HRegion implements HeapSize { // , Writable{ this.row = row; this.thread = Thread.currentThread(); } - + boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } - + RowLock newLock() { lockCount++; return new RowLock(this); } - + void releaseLock() { if (!ownedByCurrentThread()) { throw new IllegalArgumentException("Lock held by thread: " + thread @@ -5582,7 +5588,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + /** * Row lock held by a given thread. * One thread may acquire multiple locks on the same row simultaneously. @@ -5591,11 +5597,11 @@ public class HRegion implements HeapSize { // , Writable{ public class RowLock { @VisibleForTesting final RowLockContext context; private boolean released = false; - + @VisibleForTesting RowLock(RowLockContext context) { this.context = context; } - + /** * Release the given lock. If there are no remaining locks held by the current thread * then unlock the row and allow other threads to acquire the lock. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 96d4b41..dfed7d5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4043,8 +4043,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa try { int i = 0; for (ClientProtos.Action action: mutations) { - ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = - ClientProtos.ResultOrException.newBuilder(); MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { @@ -4088,9 +4086,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } } } catch (IOException ie) { - ResultOrException resultOrException = ResponseConverter.buildActionResult(ie).build(); for (int i = 0; i < mutations.size(); i++) { - builder.addResultOrException(resultOrException); + builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } } long after = EnvironmentEdgeManager.currentTimeMillis();