diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index f5cd593..54e8f32 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -19599,12 +19599,12 @@ public final class AdminProtos { com.google.protobuf.RpcCallback done); /** - * rpc Replay(.MultiRequest) returns (.MultiResponse); + * rpc Replay(.ReplicateWALEntryRequest) returns (.ReplicateWALEntryResponse); */ public abstract void replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, - com.google.protobuf.RpcCallback done); + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, + com.google.protobuf.RpcCallback done); /** * rpc RollWALWriter(.RollWALWriterRequest) returns (.RollWALWriterResponse); @@ -19726,8 +19726,8 @@ public final class AdminProtos { @java.lang.Override public void replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, - com.google.protobuf.RpcCallback done) { + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, + com.google.protobuf.RpcCallback done) { impl.replay(controller, request, done); } @@ -19806,7 +19806,7 @@ public final class AdminProtos { case 9: return impl.replicateWALEntry(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request); case 10: - return impl.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request); + return impl.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request); case 11: return impl.rollWALWriter(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest)request); case 12: @@ -19850,7 +19850,7 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 11: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); case 12: @@ -19894,7 +19894,7 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 11: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); case 12: @@ -19992,12 +19992,12 @@ public final class AdminProtos { com.google.protobuf.RpcCallback done); /** - * rpc Replay(.MultiRequest) returns (.MultiResponse); + * rpc Replay(.ReplicateWALEntryRequest) returns (.ReplicateWALEntryResponse); */ public abstract void replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, - com.google.protobuf.RpcCallback done); + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, + com.google.protobuf.RpcCallback done); /** * rpc RollWALWriter(.RollWALWriterRequest) returns (.RollWALWriterResponse); @@ -20104,8 +20104,8 @@ public final class AdminProtos { done)); return; case 10: - this.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request, - com.google.protobuf.RpcUtil.specializeCallback( + this.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request, + com.google.protobuf.RpcUtil.specializeCallback( done)); return; case 11: @@ -20163,7 +20163,7 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 11: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); case 12: @@ -20207,7 +20207,7 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 11: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); case 12: @@ -20389,17 +20389,17 @@ public final class AdminProtos { public void replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, - com.google.protobuf.RpcCallback done) { + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, + com.google.protobuf.RpcCallback done) { channel.callMethod( getDescriptor().getMethods().get(10), controller, request, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(), + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(), com.google.protobuf.RpcUtil.generalizeCallback( done, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.class, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance())); + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.class, + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance())); } public void rollWALWriter( @@ -20519,9 +20519,9 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request) throws com.google.protobuf.ServiceException; - public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay( + public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request) throws com.google.protobuf.ServiceException; public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse rollWALWriter( @@ -20672,15 +20672,15 @@ public final class AdminProtos { } - public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay( + public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse replay( com.google.protobuf.RpcController controller, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request) throws com.google.protobuf.ServiceException { - return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse) channel.callBlockingMethod( + return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse) channel.callBlockingMethod( getDescriptor().getMethods().get(10), controller, request, - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance()); + org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance()); } @@ -20962,7 +20962,7 @@ public final class AdminProtos { "ServerInfoRequest\"B\n\nServerInfo\022 \n\013serve" + "r_name\030\001 \002(\0132\013.ServerName\022\022\n\nwebui_port\030" + "\002 \001(\r\"9\n\025GetServerInfoResponse\022 \n\013server" + - "_info\030\001 \002(\0132\013.ServerInfo2\256\007\n\014AdminServic" + + "_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014AdminServic" + "e\022>\n\rGetRegionInfo\022\025.GetRegionInfoReques" + "t\032\026.GetRegionInfoResponse\022;\n\014GetStoreFil", "e\022\024.GetStoreFileRequest\032\025.GetStoreFileRe" + @@ -20978,16 +20978,17 @@ public final class AdminProtos { "nResponse\022;\n\014MergeRegions\022\024.MergeRegions" + "Request\032\025.MergeRegionsResponse\022J\n\021Replic" + "ateWALEntry\022\031.ReplicateWALEntryRequest\032\032" + - ".ReplicateWALEntryResponse\022\'\n\006Replay\022\r.M" + - "ultiRequest\032\016.MultiResponse\022>\n\rRollWALWr" + - "iter\022\025.RollWALWriterRequest\032\026.RollWALWri" + - "terResponse\022>\n\rGetServerInfo\022\025.GetServer" + - "InfoRequest\032\026.GetServerInfoResponse\0225\n\nS" + - "topServer\022\022.StopServerRequest\032\023.StopServ" + - "erResponse\022M\n\022UpdateFavoredNodes\022\032.Updat", - "eFavoredNodesRequest\032\033.UpdateFavoredNode" + - "sResponseBA\n*org.apache.hadoop.hbase.pro" + - "tobuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" + ".ReplicateWALEntryResponse\022?\n\006Replay\022\031.R" + + "eplicateWALEntryRequest\032\032.ReplicateWALEn" + + "tryResponse\022>\n\rRollWALWriter\022\025.RollWALWr" + + "iterRequest\032\026.RollWALWriterResponse\022>\n\rG" + + "etServerInfo\022\025.GetServerInfoRequest\032\026.Ge" + + "tServerInfoResponse\0225\n\nStopServer\022\022.Stop" + + "ServerRequest\032\023.StopServerResponse\022M\n\022Up", + "dateFavoredNodes\022\032.UpdateFavoredNodesReq" + + "uest\032\033.UpdateFavoredNodesResponseBA\n*org" + + ".apache.hadoop.hbase.protobuf.generatedB" + + "\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git hbase-protocol/src/main/protobuf/Admin.proto hbase-protocol/src/main/protobuf/Admin.proto index 2869342..c881851 100644 --- hbase-protocol/src/main/protobuf/Admin.proto +++ hbase-protocol/src/main/protobuf/Admin.proto @@ -253,8 +253,8 @@ service AdminService { rpc ReplicateWALEntry(ReplicateWALEntryRequest) returns(ReplicateWALEntryResponse); - rpc Replay(MultiRequest) - returns(MultiResponse); + rpc Replay(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse); rpc RollWALWriter(RollWALWriterRequest) returns(RollWALWriterResponse); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 786dd97..bdc0940 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -112,10 +113,15 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -2206,8 +2212,7 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterIds(), now, this.htableDescriptor, - this.getCoprocessorHost()); + walEdit, mutation.getClusterIds(), now, this.htableDescriptor); } // ------------------------------- @@ -4484,8 +4489,7 @@ public class HRegion implements HeapSize { // , Writable{ // 7. Append no sync if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, processor.getClusterIds(), now, this.htableDescriptor, - this.getCoprocessorHost()); + walEdit, processor.getClusterIds(), now, this.htableDescriptor); } // 8. Release region lock if (locked) { @@ -4713,7 +4717,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor, this.getCoprocessorHost()); + this.htableDescriptor); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } @@ -4862,7 +4866,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor, this.getCoprocessorHost()); + this.htableDescriptor); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } @@ -5597,4 +5601,81 @@ public class HRegion implements HeapSize { // , Writable{ } } } + + /** + * This function is used to construct replay mutations from WALEdits + * @param entries + * @param cells + * @param clusterId + * @param logEntries List of Pair contructed from its PB version - WALEntry + * instances + * @return list of mutations to be replayed + * @throws IOException + */ + public List> getReplayMutations(List entries, + CellScanner cells, UUID clusterId, List> logEntries) + throws IOException { + + List> mutations = new ArrayList>(); + + for (WALEntry entry : entries) { + HLogKey logKey = null; + WALEdit val = null; + Cell previousCell = null; + Mutation m = null; + List> editMutations = + new ArrayList>(); + int count = entry.getAssociatedCellCount(); + if (coprocessorHost != null) { + val = new WALEdit(); + } + + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell)); + + boolean isNewRowOrType = + previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(previousCell, cell); + if (isNewRowOrType) { + // Create new mutation + if (CellUtil.isDelete(cell)) { + m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + editMutations.add(new Pair(MutationType.DELETE, m)); + } else { + m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + editMutations.add(new Pair(MutationType.PUT, m)); + } + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); + } else { + ((Put) m).add(KeyValueUtil.ensureKeyValue(cell)); + } + previousCell = cell; + } + + // Start coprocessor replay here. The coprocessor is for each WALEdit + // instead of a KeyValue. + if (coprocessorHost != null) { + WALKey walKey = entry.getKey(); + logKey = + new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey + .getTableName().toByteArray()), walKey.getLogSequenceNumber(), + walKey.getWriteTime(), clusterId); + logEntries.add(new Pair(logKey, val)); + if (coprocessorHost.preWALRestore(this.getRegionInfo(), logKey, val)) { + // if bypass this log entry, ignore it ... + continue; + } + } + mutations.addAll(editMutations); + } + + return mutations; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9e7bc84..fd60697 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.Map.Entry; import java.util.Random; import java.util.Set; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -118,6 +120,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -135,6 +138,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRespon import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; @@ -181,6 +185,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; @@ -191,8 +196,10 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; @@ -3854,45 +3861,45 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. - * @param rpcc the RPC controller + * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) - public MultiResponse replay(final RpcController rpcc, final MultiRequest request) - throws ServiceException { + public ReplicateWALEntryResponse replay(final RpcController controller, + final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); - PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc; - CellScanner cellScanner = controller != null ? controller.cellScanner() : null; - // Clear scanner so we are not holding on to reference across call. - if (controller != null) controller.setCellScanner(null); + CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); - HRegion region = getRegion(request.getRegion()); - MultiResponse.Builder builder = MultiResponse.newBuilder(); - List mutates = new ArrayList(); - for (ClientProtos.MultiAction actionUnion : request.getActionList()) { - if (actionUnion.hasMutation()) { - MutationProto mutate = actionUnion.getMutation(); - MutationType type = mutate.getMutateType(); - switch (type) { - case PUT: - case DELETE: - mutates.add(mutate); - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + List entries = request.getEntryList(); + if(entries == null || entries.isEmpty()) { + // empty input + return ReplicateWALEntryResponse.newBuilder().build(); + } + + HRegion region = this.getRegionByEncodedName( + entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); + List> walEntries = new ArrayList>(); + List> mutations = region.getReplayMutations( + request.getEntryList(), cells, UUID.fromString(this.clusterId), walEntries); + if (!mutations.isEmpty()) { + OperationStatus[] result = doBatchOp(region, mutations, true); + // check if it's a partial success + for (int i = 0; result != null && i < result.length; i++) { + if (result[i] != OperationStatus.SUCCESS) { + throw new IOException(result[i].getExceptionMsg()); } - } else { - LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation."); - throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation."); } } - if (!mutates.isEmpty()) { - doBatchOp(builder, region, mutates, cellScanner, true); + if (region.getCoprocessorHost() != null) { + for (Pair wal : walEntries) { + region.getCoprocessorHost().postWALRestore(region.getRegionInfo(), wal.getFirst(), + wal.getSecond()); + } } - return builder.build(); + return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { @@ -4029,21 +4036,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa /** * Execute a list of Put/Delete mutations. - */ - protected void doBatchOp(final MultiResponse.Builder builder, - final HRegion region, final List mutates, final CellScanner cells) { - doBatchOp(builder, region, mutates, cells, false); - } - - /** - * Execute a list of Put/Delete mutations. * * @param builder * @param region * @param mutations */ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region, - final List mutations, final CellScanner cells, boolean isReplay) { + final List mutations, final CellScanner cells) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -4070,7 +4069,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mArray, isReplay); + OperationStatus codes[] = region.batchMutate(mArray, false); for (i = 0; i < codes.length; i++) { switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: @@ -4094,21 +4093,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa case SUCCESS: break; } - if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { - // in replay mode, we only need to catpure the first error because we will retry the whole - // batch when an error happens - break; - } } } catch (IOException ie) { ActionResult result = ResponseConverter.buildActionResult(ie); for (int i = 0; i < mutations.size(); i++) { builder.setResult(i, result); - if (isReplay) { - // in replay mode, we only need to catpure the first error because we will retry the whole - // batch when an error happens - break; - } } } long after = EnvironmentEdgeManager.currentTimeMillis(); @@ -4121,6 +4110,46 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } /** + * Execute a list of Put/Delete mutations. + * @param region + * @param mutations + * @param isReplay + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any + * @throws IOException + */ + protected OperationStatus[] doBatchOp(final HRegion region, + final List> mutations, boolean isReplay) throws IOException { + Mutation[] mArray = new Mutation[mutations.size()]; + long before = EnvironmentEdgeManager.currentTimeMillis(); + boolean batchContainsPuts = false, batchContainsDelete = false; + try { + int i = 0; + for (Pair m : mutations) { + if (m.getFirst() == MutationType.PUT) { + batchContainsPuts = true; + } else { + batchContainsDelete = true; + } + mArray[i++] = m.getSecond(); + } + requestCount.add(mutations.size()); + if (!region.getRegionInfo().isMetaTable()) { + cacheFlusher.reclaimMemStoreMemory(); + } + return region.batchMutate(mArray, isReplay); + } finally { + long after = EnvironmentEdgeManager.currentTimeMillis(); + if (batchContainsPuts) { + metricsRegionServer.updatePut(after - before); + } + if (batchContainsDelete) { + metricsRegionServer.updateDelete(after - before); + } + } + } + + /** * Mutate a list of rows atomically. * * @param region diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 180677e..78d03ad 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -841,7 +841,7 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore, null); + append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore); } /** @@ -872,8 +872,7 @@ class FSHLog implements HLog, Syncable { */ @SuppressWarnings("deprecation") private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, - RegionCoprocessorHost regionCoproHost) + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -894,7 +893,7 @@ class FSHLog implements HLog, Syncable { byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); - doWrite(info, logKey, edits, htd, regionCoproHost); + doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { @@ -917,10 +916,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd, - RegionCoprocessorHost regionCoproHost) + List clusterIds, final long now, HTableDescriptor htd) throws IOException { - return append(info, tableName, edits, clusterIds, now, htd, false, true, regionCoproHost); + return append(info, tableName, edits, clusterIds, now, htd, false, true); } /** @@ -1205,7 +1203,7 @@ class FSHLog implements HLog, Syncable { // TODO: Remove info. Unused. protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, - HTableDescriptor htd, RegionCoprocessorHost regionCoproHost) + HTableDescriptor htd) throws IOException { if (!this.enabled) { return; @@ -1222,18 +1220,12 @@ class FSHLog implements HLog, Syncable { if (logEdit.isReplay()) { // set replication scope null so that this won't be replicated logKey.setScopes(null); - if(regionCoproHost != null) { - regionCoproHost.preWALRestore(info, logKey, logEdit); - } } // write to our buffer for the Hlog file. logSyncer.append(new FSHLog.Entry(logKey, logEdit)); } long took = EnvironmentEdgeManager.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); - if(logEdit.isReplay() && regionCoproHost != null ) { - regionCoproHost.postWALRestore(info, logKey, logEdit); - } long len = 0; for (KeyValue kv : logEdit.getKeyValues()) { len += kv.getLength(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 51ed0d4..262fb02 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -301,8 +301,7 @@ public interface HLog { * @throws IOException */ public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd, - RegionCoprocessorHost regionCoproHost) throws IOException; + List clusterIds, final long now, HTableDescriptor htd) throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 7faa4c5..782c0b8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -166,7 +166,9 @@ public class HLogSplitter { conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); - this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512); + // a larger minBatchSize may slow down recovery because replay writer has to wait for + // enough edits before replaying them + this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); @@ -1316,8 +1318,8 @@ public class HLogSplitter { * Map key -> value layout * : -> Queue */ - private Map>> serverToBufferQueueMap = - new ConcurrentHashMap>>(); + private Map>> serverToBufferQueueMap = + new ConcurrentHashMap>>(); private List thrown = new ArrayList(); // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling @@ -1358,10 +1360,10 @@ public class HLogSplitter { // process workitems String maxLocKey = null; int maxSize = 0; - List> maxQueue = null; + List> maxQueue = null; synchronized (this.serverToBufferQueueMap) { for (String key : this.serverToBufferQueueMap.keySet()) { - List> curQueue = this.serverToBufferQueueMap.get(key); + List> curQueue = this.serverToBufferQueueMap.get(key); if (curQueue.size() > maxSize) { maxSize = curQueue.size(); maxQueue = curQueue; @@ -1398,6 +1400,8 @@ public class HLogSplitter { for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); TableName table = entry.getKey().getTablename(); + // clear scopes which isn't needed for recovery + entry.getKey().setScopes(null); String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); // skip edits of non-existent tables if (nonExistentTables != null && nonExistentTables.contains(table)) { @@ -1407,15 +1411,10 @@ public class HLogSplitter { Map maxStoreSequenceIds = null; boolean needSkip = false; - Put put = null; - Delete del = null; - KeyValue lastKV = null; HRegionLocation loc = null; - Row preRow = null; - HRegionLocation preLoc = null; - Row lastAddedRow = null; // it is not really needed here just be conservative - String preKey = null; + String locKey = null; List kvs = edit.getKeyValues(); + List skippedKVs = new ArrayList(); HConnection hconn = this.getConnectionByTableName(table); for (KeyValue kv : kvs) { @@ -1423,98 +1422,71 @@ public class HLogSplitter { // We don't handle HBASE-2231 because we may or may not replay a compaction event. // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143& // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143 - if (kv.matchingFamily(WALEdit.METAFAMILY)) continue; - - if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { - if (preRow != null) { - synchronized (serverToBufferQueueMap) { - List> queue = serverToBufferQueueMap.get(preKey); - if (queue == null) { - queue = Collections.synchronizedList(new ArrayList>()); - serverToBufferQueueMap.put(preKey, queue); - } - queue.add(new Pair(preLoc, preRow)); - lastAddedRow = preRow; - } - // store regions we have recovered so far - addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName()); - } + if (kv.matchingFamily(WALEdit.METAFAMILY)) { + skippedKVs.add(kv); + continue; + } - try { - loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(), - encodeRegionNameStr); - } catch (TableNotFoundException ex) { - // table has been deleted so skip edits of the table - LOG.info("Table " + table - + " doesn't exist. Skip log replay for region " + encodeRegionNameStr); - lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE); - if (nonExistentTables == null) { - nonExistentTables = new TreeSet(); - } - nonExistentTables.add(table); - this.skippedEdits.incrementAndGet(); - needSkip = true; - break; + try { + loc = + locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(), + encodeRegionNameStr); + } catch (TableNotFoundException ex) { + // table has been deleted so skip edits of the table + LOG.info("Table " + table + " doesn't exist. Skip log replay for region " + + encodeRegionNameStr); + lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE); + if (nonExistentTables == null) { + nonExistentTables = new TreeSet(); } + nonExistentTables.add(table); + this.skippedEdits.incrementAndGet(); + needSkip = true; + break; + } - cachedLastFlushedSequenceId = - lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); - if (cachedLastFlushedSequenceId != null - && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { - // skip the whole HLog entry - this.skippedEdits.incrementAndGet(); - needSkip = true; - break; - } else { - if (maxStoreSequenceIds == null) { - maxStoreSequenceIds = - regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); - } - if (maxStoreSequenceIds != null) { - Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily()); - if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { - // skip current kv if column family doesn't exist anymore or already flushed - continue; - } - } + cachedLastFlushedSequenceId = + lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); + if (cachedLastFlushedSequenceId != null + && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { + // skip the whole HLog entry + this.skippedEdits.incrementAndGet(); + needSkip = true; + break; + } else { + if (maxStoreSequenceIds == null) { + maxStoreSequenceIds = + regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); } - - if (kv.isDelete()) { - del = new Delete(kv.getRow()); - del.setClusterIds(entry.getKey().getClusterIds()); - preRow = del; - } else { - put = new Put(kv.getRow()); - put.setClusterIds(entry.getKey().getClusterIds()); - preRow = put; + if (maxStoreSequenceIds != null) { + Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily()); + if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { + // skip current kv if column family doesn't exist anymore or already flushed + skippedKVs.add(kv); + continue; + } } - preKey = loc.getHostnamePort() + KEY_DELIMITER + table; - preLoc = loc; } - if (kv.isDelete()) { - del.addDeleteMarker(kv); - } else { - put.add(kv); - } - lastKV = kv; } // skip the edit - if(needSkip) continue; - - // add the last row - if (preRow != null && lastAddedRow != preRow) { - synchronized (serverToBufferQueueMap) { - List> queue = serverToBufferQueueMap.get(preKey); - if (queue == null) { - queue = Collections.synchronizedList(new ArrayList>()); - serverToBufferQueueMap.put(preKey, queue); - } - queue.add(new Pair(preLoc, preRow)); + if (needSkip) continue; + + if (!skippedKVs.isEmpty()) { + kvs.removeAll(skippedKVs); + } + synchronized (serverToBufferQueueMap) { + locKey = loc.getHostnamePort() + KEY_DELIMITER + table; + List> queue = serverToBufferQueueMap.get(locKey); + if (queue == null) { + queue = + Collections.synchronizedList(new ArrayList>()); + serverToBufferQueueMap.put(locKey, queue); } - // store regions we have recovered so far - addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName()); + queue.add(new Pair(loc, entry)); } + // store regions we have recovered so far + addToRecoveredRegions(loc.getRegionInfo().getEncodedName()); } } @@ -1580,7 +1552,7 @@ public class HLogSplitter { return loc; } - private void processWorkItems(String key, List> actions) + private void processWorkItems(String key, List> actions) throws IOException { RegionServerWriter rsw = null; @@ -1663,7 +1635,7 @@ public class HLogSplitter { protected boolean flush() throws IOException { String curLoc = null; int curSize = 0; - List> curQueue = null; + List> curQueue = null; synchronized (this.serverToBufferQueueMap) { for (String locationKey : this.serverToBufferQueueMap.keySet()) { curQueue = this.serverToBufferQueueMap.get(locationKey); @@ -1792,8 +1764,8 @@ public class HLogSplitter { } TableName tableName = getTableFromLocationStr(loc); - if(tableName != null){ - LOG.warn("Invalid location string:" + loc + " found."); + if(tableName == null){ + throw new IOException("Invalid location string:" + loc + " found. Replay aborted."); } HConnection hconn = getConnectionByTableName(tableName); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index ff1ae85..4368a17 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -23,12 +23,16 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -40,8 +44,14 @@ import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; @@ -50,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; /** @@ -62,7 +73,7 @@ import com.google.protobuf.ServiceException; public class WALEditsReplaySink { private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class); - private static final int MAX_BATCH_SIZE = 3000; + private static final int MAX_BATCH_SIZE = 1024; private final Configuration conf; private final HConnection conn; @@ -93,41 +104,38 @@ public class WALEditsReplaySink { /** * Replay an array of actions of the same region directly into the newly assigned Region Server - * @param actions + * @param entries * @throws IOException */ - public void replayEntries(List> actions) throws IOException { - if (actions.size() == 0) { + public void replayEntries(List> entries) throws IOException { + if (entries.size() == 0) { return; } - int batchSize = actions.size(); - int dataSize = 0; - Map>> actionsByRegion = - new HashMap>>(); + int batchSize = entries.size(); + Map> entriesByRegion = + new HashMap>(); HRegionLocation loc = null; - Row row = null; - List> regionActions = null; + HLog.Entry entry = null; + List regionEntries = null; // Build the action list. for (int i = 0; i < batchSize; i++) { - loc = actions.get(i).getFirst(); - row = actions.get(i).getSecond(); - if (actionsByRegion.containsKey(loc.getRegionInfo())) { - regionActions = actionsByRegion.get(loc.getRegionInfo()); + loc = entries.get(i).getFirst(); + entry = entries.get(i).getSecond(); + if (entriesByRegion.containsKey(loc.getRegionInfo())) { + regionEntries = entriesByRegion.get(loc.getRegionInfo()); } else { - regionActions = new ArrayList>(); - actionsByRegion.put(loc.getRegionInfo(), regionActions); + regionEntries = new ArrayList(); + entriesByRegion.put(loc.getRegionInfo(), regionEntries); } - Action action = new Action(row, i); - regionActions.add(action); - dataSize += row.getRow().length; + regionEntries.add(entry); } long startTime = EnvironmentEdgeManager.currentTimeMillis(); // replaying edits by region - for (HRegionInfo curRegion : actionsByRegion.keySet()) { - List> allActions = actionsByRegion.get(curRegion); + for (HRegionInfo curRegion : entriesByRegion.keySet()) { + List allActions = entriesByRegion.get(curRegion); // send edits in chunks int totalActions = allActions.size(); int replayedActions = 0; @@ -142,12 +150,11 @@ public class WALEditsReplaySink { } long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; - LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime + LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime + "(ms)!"); metrics.updateReplayTime(endTime); metrics.updateReplayBatchSize(batchSize); - metrics.updateReplayDataSize(dataSize); this.totalReplayedEdits.addAndGet(batchSize); } @@ -162,12 +169,13 @@ public class WALEditsReplaySink { } private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, - final List> actions) throws IOException { + final List entries) throws IOException { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); - ReplayServerCallable callable = new ReplayServerCallable( - this.conn, this.tableName, regionLoc, regionInfo, actions); - factory. newCaller().callWithRetries(callable, this.replayTimeout); + ReplayServerCallable callable = + new ReplayServerCallable(this.conn, this.tableName, regionLoc, + regionInfo, entries); + factory. newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS @@ -182,52 +190,44 @@ public class WALEditsReplaySink { * Callable that handles the replay method call going against a single regionserver * @param */ - class ReplayServerCallable extends RegionServerCallable { + class ReplayServerCallable extends RegionServerCallable { private HRegionInfo regionInfo; - private List> actions; + private List entries; ReplayServerCallable(final HConnection connection, final TableName tableName, final HRegionLocation regionLoc, final HRegionInfo regionInfo, - final List> actions) { + final List entries) { super(connection, tableName, null); - this.actions = actions; + this.entries = entries; this.regionInfo = regionInfo; setLocation(regionLoc); } @Override - public MultiResponse call() throws IOException { + public ReplicateWALEntryResponse call() throws IOException { try { - replayToServer(this.regionInfo, this.actions); + replayToServer(this.regionInfo, this.entries); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } return null; } - private void replayToServer(HRegionInfo regionInfo, List> actions) + private void replayToServer(HRegionInfo regionInfo, List entries) throws IOException, ServiceException { + if (entries.isEmpty()) return; + + HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; + entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); - MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(), - actions); - MultiResponse protoResults = remoteSvr.replay(null, request); - // check if it's a partial success - List resultList = protoResults.getResultList(); - for (int i = 0, n = resultList.size(); i < n; i++) { - ActionResult result = resultList.get(i); - if (result.hasException()) { - Throwable t = ProtobufUtil.toException(result.getException()); - if (!skipErrors) { - IOException ie = new IOException(); - ie.initCause(t); - // retry - throw ie; - } else { - LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS - + "=true so continuing replayToServer with error:" + t.getMessage()); - return; - } - } + + Pair p = + ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); + try { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + remoteSvr.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } } @@ -237,10 +237,20 @@ public class WALEditsReplaySink { // relocate regions in case we have a new dead server or network hiccup // if not due to connection issue, the following code should run fast because it uses // cached location - for (Action action : actions) { - // use first row to relocate region because all actions are for one region - setLocation(conn.locateRegion(tableName, action.getAction().getRow())); - break; + boolean skip = false; + for (HLog.Entry entry : this.entries) { + WALEdit edit = entry.getEdit(); + List kvs = edit.getKeyValues(); + for (KeyValue kv : kvs) { + // filtering HLog meta entries + if (kv.matchingFamily(WALEdit.METAFAMILY)) continue; + + setLocation(conn.locateRegion(tableName, kv.getRow())); + skip = true; + break; + } + // use first log entry to relocate region because all entries are for one region + if (skip) break; } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 5f7e541..fbd5b69 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -547,7 +547,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override - public MultiResponse replay(RpcController controller, MultiRequest request) + public ReplicateWALEntryResponse + replay(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { // TODO Auto-generated method stub return null; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 86f716e..55425a4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -261,7 +261,6 @@ public class TestDistributedLogSplitting { // they will consume recovered.edits master.balanceSwitch(false); - List rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); @@ -286,7 +285,6 @@ public class TestDistributedLogSplitting { // they will consume recovered.edits master.balanceSwitch(false); - List rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 6c15238..8a7a096 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3835,7 +3835,7 @@ public class TestHRegion extends HBaseTestCase { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List)any(), - anyLong(), (HTableDescriptor)any(), (RegionCoprocessorHost)any()); + anyLong(), (HTableDescriptor)any()); //verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 7156dac..597f9d5 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -105,8 +105,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { - hlog.appendNoSync(hri, hri.getTable(), walEdit, - new ArrayList(), now, htd, null); + hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList(), now, htd); } else { hlog.append(hri, hri.getTable(), walEdit, now, htd); } @@ -200,7 +199,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool LOG.info("Rolling after " + appends + " edits"); rollWriter(); } - super.doWrite(info, logKey, logEdit, htd, null); + super.doWrite(info, logKey, logEdit, htd); }; }; hlog.rollWriter();