diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 64a4968..2ebddff 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -19,6 +19,14 @@ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.Cell; @@ -32,14 +40,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; - @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable, @@ -54,6 +54,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C 1 * Bytes.SIZEOF_LONG + // writeToWAL Bytes.SIZEOF_BOOLEAN + + // isReplay + Bytes.SIZEOF_BOOLEAN + // familyMap ClassSize.REFERENCE + // familyMap @@ -65,6 +67,9 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; protected boolean writeToWAL = true; + // make current mutation as a distributed log replay change + protected boolean isReplay = false; + // A Map sorted by column family. protected NavigableMap> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); @@ -181,6 +186,21 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C public void setWriteToWAL(boolean write) { this.writeToWAL = write; } + + /** + * @return true if current change is for distributed log replay + */ + public boolean getIsReplay() { + return this.isReplay; + } + + /** + * Set whether current change is in replay or not. + * @param replaySwitch + */ + public void setIsReplay(boolean replaySwitch) { + this.isReplay = replaySwitch; + } /** * Method for retrieving the put's familyMap diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionInRecoveryException.java hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionInRecoveryException.java index 426203e..66e0fa0 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionInRecoveryException.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionInRecoveryException.java @@ -21,14 +21,12 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import java.io.IOException; - /** * Thrown when a read request issued against a region which is in recovering state. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class RegionInRecoveryException extends IOException { +public class RegionInRecoveryException extends NotServingRegionException { private static final long serialVersionUID = 327302071153799L; /** default constructor */ diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e79ab5f..71a59f8 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -781,6 +781,7 @@ public final class HConstants { public static final int QOS_THRESHOLD = 10; public static final int HIGH_QOS = 100; public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS + public static final int REPLAY_QOS = 6; // REPLICATION_QOS < REPLAY_QOS < high_QOS /** Directory under /hbase where archived hfiles are stored */ public static final String HFILE_ARCHIVE_DIRECTORY = ".archive"; diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index a23c498..073d4f2 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -16383,6 +16383,11 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, com.google.protobuf.RpcCallback done); + public abstract void replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, + com.google.protobuf.RpcCallback done); + public abstract void rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request, @@ -16484,6 +16489,14 @@ public final class AdminProtos { } @java.lang.Override + public void replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, + com.google.protobuf.RpcCallback done) { + impl.replay(controller, request, done); + } + + @java.lang.Override public void rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request, @@ -16550,10 +16563,12 @@ public final class AdminProtos { case 9: return impl.replicateWALEntry(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request); case 10: - return impl.rollWALWriter(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest)request); + return impl.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request); case 11: - return impl.getServerInfo(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest)request); + return impl.rollWALWriter(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest)request); case 12: + return impl.getServerInfo(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest)request); + case 13: return impl.stopServer(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest)request); default: throw new java.lang.AssertionError("Can't get here."); @@ -16590,10 +16605,12 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); case 11: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); case 12: + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest.getDefaultInstance(); + case 13: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -16630,10 +16647,12 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); case 11: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); case 12: + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance(); + case 13: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -16693,6 +16712,11 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request, com.google.protobuf.RpcCallback done); + public abstract void replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, + com.google.protobuf.RpcCallback done); + public abstract void rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request, @@ -16781,16 +16805,21 @@ public final class AdminProtos { done)); return; case 10: + this.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; + case 11: this.rollWALWriter(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest)request, com.google.protobuf.RpcUtil.specializeCallback( done)); return; - case 11: + case 12: this.getServerInfo(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest)request, com.google.protobuf.RpcUtil.specializeCallback( done)); return; - case 12: + case 13: this.stopServer(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest)request, com.google.protobuf.RpcUtil.specializeCallback( done)); @@ -16830,10 +16859,12 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); case 11: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance(); case 12: + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest.getDefaultInstance(); + case 13: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -16870,10 +16901,12 @@ public final class AdminProtos { case 9: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(); case 10: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); case 11: - return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance(); + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(); case 12: + return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance(); + case 13: return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -17046,12 +17079,27 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance())); } + public void replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(10), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.class, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance())); + } + public void rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request, com.google.protobuf.RpcCallback done) { channel.callMethod( - getDescriptor().getMethods().get(10), + getDescriptor().getMethods().get(11), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance(), @@ -17066,7 +17114,7 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest request, com.google.protobuf.RpcCallback done) { channel.callMethod( - getDescriptor().getMethods().get(11), + getDescriptor().getMethods().get(12), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance(), @@ -17081,7 +17129,7 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest request, com.google.protobuf.RpcCallback done) { channel.callMethod( - getDescriptor().getMethods().get(12), + getDescriptor().getMethods().get(13), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse.getDefaultInstance(), @@ -17148,6 +17196,11 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request) throws com.google.protobuf.ServiceException; + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) + throws com.google.protobuf.ServiceException; + public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request) @@ -17291,12 +17344,24 @@ public final class AdminProtos { } + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse) channel.callBlockingMethod( + getDescriptor().getMethods().get(10), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance()); + } + + public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse rollWALWriter( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest request) throws com.google.protobuf.ServiceException { return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse) channel.callBlockingMethod( - getDescriptor().getMethods().get(10), + getDescriptor().getMethods().get(11), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance()); @@ -17308,7 +17373,7 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest request) throws com.google.protobuf.ServiceException { return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse) channel.callBlockingMethod( - getDescriptor().getMethods().get(11), + getDescriptor().getMethods().get(12), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse.getDefaultInstance()); @@ -17320,7 +17385,7 @@ public final class AdminProtos { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest request) throws com.google.protobuf.ServiceException { return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse) channel.callBlockingMethod( - getDescriptor().getMethods().get(12), + getDescriptor().getMethods().get(13), controller, request, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse.getDefaultInstance()); @@ -17503,88 +17568,89 @@ public final class AdminProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\013Admin.proto\032\013hbase.proto\"Q\n\024GetRegionI" + - "nfoRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + - "fier\022\027\n\017compactionState\030\002 \001(\010\"\301\001\n\025GetReg" + - "ionInfoResponse\022\037\n\nregionInfo\030\001 \002(\0132\013.Re" + - "gionInfo\022?\n\017compactionState\030\002 \001(\0162&.GetR" + - "egionInfoResponse.CompactionState\"F\n\017Com" + - "pactionState\022\010\n\004NONE\020\000\022\t\n\005MINOR\020\001\022\t\n\005MAJ" + - "OR\020\002\022\023\n\017MAJOR_AND_MINOR\020\003\"G\n\023GetStoreFil" + - "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" + - "er\022\016\n\006family\030\002 \003(\014\")\n\024GetStoreFileRespon", - "se\022\021\n\tstoreFile\030\001 \003(\t\"\030\n\026GetOnlineRegion" + - "Request\":\n\027GetOnlineRegionResponse\022\037\n\nre" + - "gionInfo\030\001 \003(\0132\013.RegionInfo\"\225\001\n\021OpenRegi" + - "onRequest\0223\n\010openInfo\030\001 \003(\0132!.OpenRegion" + - "Request.RegionOpenInfo\032K\n\016RegionOpenInfo" + - "\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\034\n\024version" + - "OfOfflineNode\030\002 \001(\r\"\234\001\n\022OpenRegionRespon" + - "se\022<\n\014openingState\030\001 \003(\0162&.OpenRegionRes" + - "ponse.RegionOpeningState\"H\n\022RegionOpenin" + - "gState\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022", - "\n\016FAILED_OPENING\020\002\"\232\001\n\022CloseRegionReques" + - "t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\024v" + - "ersionOfClosingNode\030\002 \001(\r\022\034\n\016transitionI" + - "nZK\030\003 \001(\010:\004true\022&\n\021destinationServer\030\004 \001" + - "(\0132\013.ServerName\"%\n\023CloseRegionResponse\022\016" + - "\n\006closed\030\001 \002(\010\"M\n\022FlushRegionRequest\022 \n\006" + - "region\030\001 \002(\0132\020.RegionSpecifier\022\025\n\rifOlde" + - "rThanTs\030\002 \001(\004\"=\n\023FlushRegionResponse\022\025\n\r" + - "lastFlushTime\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"J\n\022" + - "SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020.Reg", - "ionSpecifier\022\022\n\nsplitPoint\030\002 \001(\014\"\025\n\023Spli" + - "tRegionResponse\"W\n\024CompactRegionRequest\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005maj" + - "or\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegio" + - "nResponse\"t\n\023MergeRegionsRequest\022!\n\007regi" + - "onA\030\001 \002(\0132\020.RegionSpecifier\022!\n\007regionB\030\002" + - " \002(\0132\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010" + - ":\005false\"\026\n\024MergeRegionsResponse\"1\n\004UUID\022" + - "\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigBits\030\002 \002" + - "(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.WALEntry.", - "WALKey\022\037\n\004edit\030\002 \002(\0132\021.WALEntry.WALEdit\032" + - "~\n\006WALKey\022\031\n\021encodedRegionName\030\001 \002(\014\022\021\n\t" + - "tableName\030\002 \002(\014\022\031\n\021logSequenceNumber\030\003 \002" + - "(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tclusterId\030\005 \001(\013" + - "2\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkeyValueBytes\030\001 \003" + - "(\014\0222\n\013familyScope\030\002 \003(\0132\035.WALEntry.WALEd" + - "it.FamilyScope\032M\n\013FamilyScope\022\016\n\006family\030" + - "\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033.WALEntry.WALE" + - "dit.ScopeType\"F\n\tScopeType\022\033\n\027REPLICATIO" + - "N_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLO", - "BAL\020\001\"4\n\030ReplicateWALEntryRequest\022\030\n\005ent" + - "ry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALEntry" + - "Response\"\026\n\024RollWALWriterRequest\".\n\025Roll" + - "WALWriterResponse\022\025\n\rregionToFlush\030\001 \003(\014" + - "\"#\n\021StopServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n" + - "\022StopServerResponse\"\026\n\024GetServerInfoRequ" + - "est\"@\n\nServerInfo\022\037\n\nserverName\030\001 \002(\0132\013." + - "ServerName\022\021\n\twebuiPort\030\002 \001(\r\"8\n\025GetServ" + - "erInfoResponse\022\037\n\nserverInfo\030\001 \002(\0132\013.Ser" + - "verInfo2\266\006\n\014AdminService\022>\n\rgetRegionInf", - "o\022\025.GetRegionInfoRequest\032\026.GetRegionInfo" + - "Response\022;\n\014getStoreFile\022\024.GetStoreFileR" + - "equest\032\025.GetStoreFileResponse\022D\n\017getOnli" + - "neRegion\022\027.GetOnlineRegionRequest\032\030.GetO" + - "nlineRegionResponse\0225\n\nopenRegion\022\022.Open" + - "RegionRequest\032\023.OpenRegionResponse\0228\n\013cl" + - "oseRegion\022\023.CloseRegionRequest\032\024.CloseRe" + - "gionResponse\0228\n\013flushRegion\022\023.FlushRegio" + - "nRequest\032\024.FlushRegionResponse\0228\n\013splitR" + - "egion\022\023.SplitRegionRequest\032\024.SplitRegion", - "Response\022>\n\rcompactRegion\022\025.CompactRegio" + - "nRequest\032\026.CompactRegionResponse\022;\n\014merg" + - "eRegions\022\024.MergeRegionsRequest\032\025.MergeRe" + - "gionsResponse\022J\n\021replicateWALEntry\022\031.Rep" + - "licateWALEntryRequest\032\032.ReplicateWALEntr" + - "yResponse\022>\n\rrollWALWriter\022\025.RollWALWrit" + - "erRequest\032\026.RollWALWriterResponse\022>\n\rget" + - "ServerInfo\022\025.GetServerInfoRequest\032\026.GetS" + - "erverInfoResponse\0225\n\nstopServer\022\022.StopSe" + - "rverRequest\032\023.StopServerResponseBA\n*org.", - "apache.hadoop.hbase.protobuf.generatedB\013" + - "AdminProtosH\001\210\001\001\240\001\001" + "\n\013Admin.proto\032\014Client.proto\032\013hbase.proto" + + "\"Q\n\024GetRegionInfoRequest\022 \n\006region\030\001 \002(\013" + + "2\020.RegionSpecifier\022\027\n\017compactionState\030\002 " + + "\001(\010\"\301\001\n\025GetRegionInfoResponse\022\037\n\nregionI" + + "nfo\030\001 \002(\0132\013.RegionInfo\022?\n\017compactionStat" + + "e\030\002 \001(\0162&.GetRegionInfoResponse.Compacti" + + "onState\"F\n\017CompactionState\022\010\n\004NONE\020\000\022\t\n\005" + + "MINOR\020\001\022\t\n\005MAJOR\020\002\022\023\n\017MAJOR_AND_MINOR\020\003\"" + + "G\n\023GetStoreFileRequest\022 \n\006region\030\001 \002(\0132\020" + + ".RegionSpecifier\022\016\n\006family\030\002 \003(\014\")\n\024GetS", + "toreFileResponse\022\021\n\tstoreFile\030\001 \003(\t\"\030\n\026G" + + "etOnlineRegionRequest\":\n\027GetOnlineRegion" + + "Response\022\037\n\nregionInfo\030\001 \003(\0132\013.RegionInf" + + "o\"\225\001\n\021OpenRegionRequest\0223\n\010openInfo\030\001 \003(" + + "\0132!.OpenRegionRequest.RegionOpenInfo\032K\n\016" + + "RegionOpenInfo\022\033\n\006region\030\001 \002(\0132\013.RegionI" + + "nfo\022\034\n\024versionOfOfflineNode\030\002 \001(\r\"\234\001\n\022Op" + + "enRegionResponse\022<\n\014openingState\030\001 \003(\0162&" + + ".OpenRegionResponse.RegionOpeningState\"H" + + "\n\022RegionOpeningState\022\n\n\006OPENED\020\000\022\022\n\016ALRE", + "ADY_OPENED\020\001\022\022\n\016FAILED_OPENING\020\002\"\232\001\n\022Clo" + + "seRegionRequest\022 \n\006region\030\001 \002(\0132\020.Region" + + "Specifier\022\034\n\024versionOfClosingNode\030\002 \001(\r\022" + + "\034\n\016transitionInZK\030\003 \001(\010:\004true\022&\n\021destina" + + "tionServer\030\004 \001(\0132\013.ServerName\"%\n\023CloseRe" + + "gionResponse\022\016\n\006closed\030\001 \002(\010\"M\n\022FlushReg" + + "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + + "fier\022\025\n\rifOlderThanTs\030\002 \001(\004\"=\n\023FlushRegi" + + "onResponse\022\025\n\rlastFlushTime\030\001 \002(\004\022\017\n\007flu" + + "shed\030\002 \001(\010\"J\n\022SplitRegionRequest\022 \n\006regi", + "on\030\001 \002(\0132\020.RegionSpecifier\022\022\n\nsplitPoint" + + "\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Compact" + + "RegionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSp" + + "ecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027" + + "\n\025CompactRegionResponse\"t\n\023MergeRegionsR" + + "equest\022!\n\007regionA\030\001 \002(\0132\020.RegionSpecifie" + + "r\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010" + + "forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegionsRes" + + "ponse\"1\n\004UUID\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013m" + + "ostSigBits\030\002 \002(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002", + "(\0132\020.WALEntry.WALKey\022\037\n\004edit\030\002 \002(\0132\021.WAL" + + "Entry.WALEdit\032~\n\006WALKey\022\031\n\021encodedRegion" + + "Name\030\001 \002(\014\022\021\n\ttableName\030\002 \002(\014\022\031\n\021logSequ" + + "enceNumber\030\003 \002(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tc" + + "lusterId\030\005 \001(\0132\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkey" + + "ValueBytes\030\001 \003(\014\0222\n\013familyScope\030\002 \003(\0132\035." + + "WALEntry.WALEdit.FamilyScope\032M\n\013FamilySc" + + "ope\022\016\n\006family\030\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033" + + ".WALEntry.WALEdit.ScopeType\"F\n\tScopeType" + + "\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICA", + "TION_SCOPE_GLOBAL\020\001\"4\n\030ReplicateWALEntry" + + "Request\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Rep" + + "licateWALEntryResponse\"\026\n\024RollWALWriterR" + + "equest\".\n\025RollWALWriterResponse\022\025\n\rregio" + + "nToFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006r" + + "eason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024Get" + + "ServerInfoRequest\"@\n\nServerInfo\022\037\n\nserve" + + "rName\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 " + + "\001(\r\"8\n\025GetServerInfoResponse\022\037\n\nserverIn" + + "fo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>", + "\n\rgetRegionInfo\022\025.GetRegionInfoRequest\032\026" + + ".GetRegionInfoResponse\022;\n\014getStoreFile\022\024" + + ".GetStoreFileRequest\032\025.GetStoreFileRespo" + + "nse\022D\n\017getOnlineRegion\022\027.GetOnlineRegion" + + "Request\032\030.GetOnlineRegionResponse\0225\n\nope" + + "nRegion\022\022.OpenRegionRequest\032\023.OpenRegion" + + "Response\0228\n\013closeRegion\022\023.CloseRegionReq" + + "uest\032\024.CloseRegionResponse\0228\n\013flushRegio" + + "n\022\023.FlushRegionRequest\032\024.FlushRegionResp" + + "onse\0228\n\013splitRegion\022\023.SplitRegionRequest", + "\032\024.SplitRegionResponse\022>\n\rcompactRegion\022" + + "\025.CompactRegionRequest\032\026.CompactRegionRe" + + "sponse\022;\n\014mergeRegions\022\024.MergeRegionsReq" + + "uest\032\025.MergeRegionsResponse\022J\n\021replicate" + + "WALEntry\022\031.ReplicateWALEntryRequest\032\032.Re" + + "plicateWALEntryResponse\022\'\n\006replay\022\r.Mult" + + "iRequest\032\016.MultiResponse\022>\n\rrollWALWrite" + + "r\022\025.RollWALWriterRequest\032\026.RollWALWriter" + + "Response\022>\n\rgetServerInfo\022\025.GetServerInf" + + "oRequest\032\026.GetServerInfoResponse\0225\n\nstop", + "Server\022\022.StopServerRequest\032\023.StopServerR" + + "esponseBA\n*org.apache.hadoop.hbase.proto" + + "buf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -17861,6 +17927,7 @@ public final class AdminProtos { com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(), }, assigner); } diff --git hbase-protocol/src/main/protobuf/Admin.proto hbase-protocol/src/main/protobuf/Admin.proto index b56ea1d..26db352 100644 --- hbase-protocol/src/main/protobuf/Admin.proto +++ hbase-protocol/src/main/protobuf/Admin.proto @@ -24,6 +24,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "Client.proto"; import "hbase.proto"; message GetRegionInfoRequest { @@ -260,6 +261,9 @@ service AdminService { rpc replicateWALEntry(ReplicateWALEntryRequest) returns(ReplicateWALEntryResponse); + + rpc replay(MultiRequest) + returns(MultiResponse); rpc rollWALWriter(RollWALWriterRequest) returns(RollWALWriterResponse); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 25e4859..fd77acc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; +import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.RegionTooBusyException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.exceptions.UnknownScannerException; @@ -200,6 +201,16 @@ public class HRegion implements HeapSize { // , Writable{ protected long completeSequenceId = -1L; + /** + * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for + * startRegionOperation to possibly invoke different checks before any region operations. Not all + * operations have to be defined here. It's only needed when a special check is need in + * startRegionOperation + */ + protected enum Operation { + ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT + } + ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -281,6 +292,11 @@ public class HRegion implements HeapSize { // , Writable{ private final AtomicInteger minorInProgress = new AtomicInteger(0); /** + * When a regon is in recovering state, it can only accept writes not reads + */ + private volatile boolean recovering = false; + + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every * read operation. @@ -775,6 +791,14 @@ public class HRegion implements HeapSize { // , Writable{ return this.closing.get(); } + /** + * Reset recovering state of current region + * @param newState + */ + public void setRecovering(boolean newState) { + this.recovering = newState; + } + /** @return true if region is available (not closed and not closing) */ public boolean isAvailable() { return !isClosed() && !isClosing(); @@ -1870,6 +1894,52 @@ public class HRegion implements HeapSize { // , Writable{ } return batchOp.retCodeDetails; } + + /** + * Perform a batch of mutations. + * It supports only Put and Delete mutations and will ignore other types passed. + * @param mutationsAndLocks + * the list of mutations paired with their requested lock IDs. + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + OperationStatus[] batchMutate( + Pair[] mutationsAndLocks, boolean inReplay) throws IOException { + BatchOperationInProgress> batchOp = + new BatchOperationInProgress>(mutationsAndLocks); + + boolean initialized = false; + + while (!batchOp.isDone()) { + if(!inReplay) { + checkReadOnly(); + } + checkResources(); + + long newSize; + startRegionOperation(); + + try { + if (!initialized) { + this.writeRequestsCount.increment(); + if(!inReplay) { + doPreMutationHook(batchOp); + } + initialized = true; + } + long addedSize = doMiniBatchMutation(batchOp); + newSize = this.addAndGetGlobalMemstoreSize(addedSize); + } finally { + closeRegionOperation(); + } + if (isFlushSize(newSize)) { + requestFlush(); + } + } + return batchOp.retCodeDetails; + } + private void doPreMutationHook(BatchOperationInProgress> batchOp) throws IOException { @@ -1879,6 +1949,10 @@ public class HRegion implements HeapSize { // , Writable{ for (int i = 0 ; i < batchOp.operations.length; i++) { Pair nextPair = batchOp.operations[i]; Mutation m = nextPair.getFirst(); + if(m.getIsReplay()) { + // skip replay changes pre coprocessor hook + continue; + } if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { // pre hook says skip this Put @@ -1905,8 +1979,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - - + @SuppressWarnings("unchecked") private long doMiniBatchMutation( BatchOperationInProgress> batchOp) throws IOException { @@ -1925,6 +1998,7 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; boolean walSyncSuccessful = false; boolean locked = false; + boolean isInReplay = false; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); @@ -1935,7 +2009,13 @@ public class HRegion implements HeapSize { // , Writable{ int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; - try { + try { + // check if current batch is from distributedLogReplay + if(firstIndex < batchOp.operations.length) { + Mutation mutation = batchOp.operations[firstIndex].getFirst(); + isInReplay = mutation.getIsReplay(); + } + // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure // we acquire at least one. @@ -2021,7 +2101,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + // we should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTimeMillis(); @@ -2060,7 +2140,7 @@ public class HRegion implements HeapSize { // , Writable{ w = mvcc.beginMemstoreInsert(); // calling the pre CP hook for batch mutation - if (coprocessorHost != null) { + if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress> miniBatchOp = new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); @@ -2142,7 +2222,7 @@ public class HRegion implements HeapSize { // , Writable{ } walSyncSuccessful = true; // calling the post CP hook for batch mutation - if (coprocessorHost != null) { + if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress> miniBatchOp = new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); @@ -2161,7 +2241,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 9. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. // ------------------------------------ - if (coprocessorHost != null) { + if (!isInReplay && coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3431,7 +3511,7 @@ public class HRegion implements HeapSize { // , Writable{ "after we renewed it. Could be caused by a very slow scanner " + "or a lengthy garbage collection"); } - startRegionOperation(); + startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { @@ -4645,7 +4725,7 @@ public class HRegion implements HeapSize { // , Writable{ checkReadOnly(); // Lock row - startRegionOperation(); + startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); WriteEntry w = null; try { @@ -5228,6 +5308,31 @@ public class HRegion implements HeapSize { // , Writable{ */ public void startRegionOperation() throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { + startRegionOperation(Operation.ANY); + } + + /** + * @param op The operation is about to be taken on the region + * @throws NotServingRegionException + * @throws RegionTooBusyException + * @throws InterruptedIOException + */ + protected void startRegionOperation(Operation op) throws NotServingRegionException, + RegionTooBusyException, InterruptedIOException { + switch (op) { + case INCREMENT: + case APPEND: + case GET: + case SCAN: + // when a region in recovering state, no read is allowed + if (this.recovering) { + throw new RegionInRecoveryException(this.getRegionNameAsString() + + " is recovering"); + } + break; + default: + break; + } if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8093930..5598c26 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException; -import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionServerRunningException; @@ -291,8 +290,8 @@ public class HRegionServer implements ClientProtocol, * Set of regions currently being in recovering state which means it can accept writes(edits from * previous failed region server) but not reads. A recovering region is also an online region. */ - protected final Set recoveringRegions = Collections - .synchronizedSet(new HashSet()); + protected final Map recoveringRegions = Collections + .synchronizedMap(new HashMap()); // Leases protected Leases leases; @@ -1941,7 +1940,7 @@ public class HRegionServer implements ClientProtocol, return this.stopping; } - public Set getRecoveringRegions() { + public Map getRecoveringRegions() { return this.recoveringRegions; } @@ -2627,9 +2626,6 @@ public class HRegionServer implements ClientProtocol, requestCount.increment(); HRegion region = getRegion(request.getRegion()); - // check if current region is in recovering phase - checkRegionIsInRecovering(region); - GetResponse.Builder builder = GetResponse.newBuilder(); ClientProtos.Get get = request.getGet(); Boolean existence = null; @@ -2980,9 +2976,6 @@ public class HRegionServer implements ClientProtocol, } if (!done) { - // check if current region is in recovering phase - this.checkRegionIsInRecovering(region); - long maxResultSize = scanner.getMaxResultSize(); if (maxResultSize <= 0) { maxResultSize = maxScannerResultSize; @@ -3436,7 +3429,7 @@ public class HRegionServer implements ClientProtocol, if (previous == null) { // check if the region to be opened is marked in recovering state in ZK if (isRegionMarkedRecoveringInZK(region.getEncodedName())) { - this.recoveringRegions.add(region.getEncodedName()); + this.recoveringRegions.put(region.getEncodedName(), null); } // If there is no action in progress, we can submit a specific handler. // Need to pass the expected version in the constructor. @@ -3681,8 +3674,60 @@ public class HRegionServer implements ClientProtocol, } /** + * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is + * that the given mutations will be durable on the receiving RS if this method returns without any + * exception. + * @param rpcc the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority = HConstants.REPLAY_QOS) + public MultiResponse replay(final RpcController rpcc, final MultiRequest request) + throws ServiceException { + PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc; + CellScanner cellScanner = controller != null ? controller.cellScanner() : null; + // Clear scanner so we are not holding on to reference across call. + controller.setCellScanner(null); + try { + HRegion region = getRegion(request.getRegion()); + MultiResponse.Builder builder = MultiResponse.newBuilder(); + List mutates = new ArrayList(); + for (ClientProtos.MultiAction actionUnion : request.getActionList()) { + requestCount.increment(); + try { + if (actionUnion.hasMutation()) { + MutationProto mutate = actionUnion.getMutation(); + MutationType type = mutate.getMutateType(); + switch (type) { + case PUT: + case DELETE: + mutates.add(mutate); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + } + } else { + LOG.warn("Error: invalid action: " + actionUnion + ". " + + "it must be a Mutation."); + throw new DoNotRetryIOException("Invalid action, " + + "it must be a Mutation."); + } + } catch (IOException ie) { + builder.addResult(ResponseConverter.buildActionResult(ie)); + } + } + if (!mutates.isEmpty()) { + doBatchOp(builder, region, mutates, cellScanner, true); + } + return MultiResponse.newBuilder().build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** * Roll the WAL writer of the region server. - * * @param controller the RPC controller * @param request the request * @throws ServiceException @@ -3810,13 +3855,23 @@ public class HRegionServer implements ClientProtocol, /** * Execute a list of Put/Delete mutations. + * + @@ -3882,6 +3925,18 @@ public class HRegionServer implements ClientProtocol, + */ + protected void doBatchOp(final MultiResponse.Builder builder, + final HRegion region, final List mutates, final CellScanner cells) { + doBatchOp(builder, region, mutates, cells, false); + } + + /** + * Execute a list of Put/Delete mutations. * * @param builder * @param region * @param mutations */ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region, - final List mutations, final CellScanner cells) { + final List mutations, final CellScanner cells, boolean inReplay) { @SuppressWarnings("unchecked") Pair[] mutationsWithLocks = new Pair[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); @@ -3835,6 +3890,7 @@ public class HRegionServer implements ClientProtocol, mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } + mutation.setIsReplay(inReplay); mutationsWithLocks[i++] = new Pair(mutation, null); builder.addResult(result); } @@ -4079,18 +4135,4 @@ public class HRegionServer implements ClientProtocol, return result; } - - /** - * Throws RegionInRecoveryException when a region is in recoverying state to reject read requests - * @param region - * @throws RegionInRecoveryException - */ - private void checkRegionIsInRecovering(HRegion region) throws RegionInRecoveryException { - // check if current region is in recovering phase - if (!this.recoveringRegions.isEmpty() - && this.recoveringRegions.contains(region.getRegionInfo().getEncodedName())) { - throw new RegionInRecoveryException(region.getRegionInfo().getRegionNameAsString() - + " is recovering"); - } - } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index cdcd7f3..a343d9a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; @@ -107,4 +108,9 @@ public interface RegionServerServices extends OnlineRegions { * @return The RegionServer's CatalogTracker */ public CatalogTracker getCatalogTracker(); + + /** + * @return set of recovering regions on the hosting region server + */ + public Map getRecoveringRegions(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 66fb052..4cdd3eb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -214,11 +214,11 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { taskReadyLock.wait(checkInterval); if (paths.isEmpty() && this.server != null) { // check to see if we have stale recovering regions in our internal memory state - Set recoveringRegions = this.server.getRecoveringRegions(); + Map recoveringRegions = this.server.getRecoveringRegions(); if (!recoveringRegions.isEmpty()) { // Make a local copy to prevent ConcurrentModificationException when other threads // modify recoveringRegions - List tmpCopy = new ArrayList(recoveringRegions); + List tmpCopy = new ArrayList(recoveringRegions.keySet()); for (String region : tmpCopy) { String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region); try { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 9ac9a71..53e2ba7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -130,6 +131,16 @@ public class OpenRegionHandler extends EventHandler { if (region == null) { return; } + + // check if we need set current region in recovering state + region.setRecovering(false); + Map recoveringRegions = this.rsServices.getRecoveringRegions(); + if (!recoveringRegions.isEmpty() + && recoveringRegions.containsKey(region.getRegionInfo().getEncodedName())) { + region.setRecovering(true); + recoveringRegions.put(region.getRegionInfo().getEncodedName(), region); + } + boolean failed = true; if (tickleOpening("post_region_open")) { if (updateMeta(region)) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 0393081..a8f9ce6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -38,9 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -60,7 +58,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException; @@ -78,6 +77,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; @@ -131,12 +131,6 @@ public class HLogSplitter { final boolean distributedLogReplay; - /** - * used to construct HTable instance which is replaying WAL edits to assigned region servers. - * Regions of those WAL edits will be in recovering mode and can't accept reads. - */ - private final ExecutorService threadPoolForLogReplay; - // Number of writer threads private final int numWriterThreads; @@ -205,17 +199,10 @@ public class HLogSplitter { if (this.distributedLogReplay) { this.numWriterThreads = conf.getInt("hbase.regionserver.wal.logreplay.writer.threads", 3); - this.threadPoolForLogReplay = new ThreadPoolExecutor(this.numWriterThreads, conf.getInt( - "hbase.htable.threads.max", Integer.MAX_VALUE), conf.getLong( - "hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-recover")); - ((ThreadPoolExecutor) this.threadPoolForLogReplay).allowCoreThreadTimeOut(true); - - outputSink = new LogReplayOutputSink(numWriterThreads, this.threadPoolForLogReplay); + outputSink = new LogReplayOutputSink(numWriterThreads); } else { this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); outputSink = new LogSplittingOutputSink(numWriterThreads); - this.threadPoolForLogReplay = null; } } @@ -1599,23 +1586,21 @@ public class HLogSplitter { private final Map writers = new ConcurrentHashMap(); - private final ExecutorService sharedThreadPool; - - private Map tableNameToHTableMap; + private Map tableNameToHConnectionMap; /** * Map key -> value layout : * * -> Queue */ - private Map> serverToBufferQueueMap; + private Map>> serverToBufferQueueMap; private List thrown; - public LogReplayOutputSink(int numWriters, ExecutorService pool) { + public LogReplayOutputSink(int numWriters) { super(numWriters); - this.sharedThreadPool = pool; - this.tableNameToHTableMap = Collections.synchronizedMap(new TreeMap( - Bytes.BYTES_COMPARATOR)); - this.serverToBufferQueueMap = new ConcurrentHashMap>(); + this.tableNameToHConnectionMap = Collections.synchronizedMap( + new TreeMap(Bytes.BYTES_COMPARATOR)); + this.serverToBufferQueueMap = + new ConcurrentHashMap>>(); this.thrown = new ArrayList(); } @@ -1635,31 +1620,36 @@ public class HLogSplitter { for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); byte[] table = entry.getKey().getTablename(); - HTable htable = this.getHTable(table); + HConnection hconn = this.getConnectionByTableName(table); Put put = null; Delete del = null; KeyValue lastKV = null; HRegionLocation loc = null; Row preRow = null; + HRegionLocation preLoc = null; Row lastAddedRow = null; // it is not really needed here just be conservative String preKey = null; List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { + // filtering HLog meta entries + if (HLogUtil.isMetaFamily(kv.getFamily())) continue; + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { if (preRow != null) { synchronized (serverToBufferQueueMap) { - List queue = serverToBufferQueueMap.get(preKey); + List> queue = serverToBufferQueueMap.get(preKey); if (queue == null) { - queue = Collections.synchronizedList(new ArrayList()); + queue = Collections.synchronizedList(new ArrayList>()); serverToBufferQueueMap.put(preKey, queue); } - queue.add(preRow); + queue.add(new Pair(preLoc, preRow)); lastAddedRow = preRow; } } - loc = htable.getRegionLocation(kv.getRow(), false); + + loc = hconn.getRegionLocation(table, kv.getRow(), false); if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); @@ -1674,6 +1664,7 @@ public class HLogSplitter { + " of table:" + Bytes.toString(table)); } preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table); + preLoc = loc; } if (kv.isDelete()) { del.addDeleteMarker(kv); @@ -1686,12 +1677,12 @@ public class HLogSplitter { // add the last row if (preRow != null && lastAddedRow != preRow) { synchronized (serverToBufferQueueMap) { - List queue = serverToBufferQueueMap.get(preKey); + List> queue = serverToBufferQueueMap.get(preKey); if (queue == null) { - queue = Collections.synchronizedList(new ArrayList()); + queue = Collections.synchronizedList(new ArrayList>()); serverToBufferQueueMap.put(preKey, queue); } - queue.add(preRow); + queue.add(new Pair(preLoc, preRow)); } } } @@ -1699,10 +1690,10 @@ public class HLogSplitter { // process workitems String maxLocKey = null; int maxSize = 0; - List maxQueue = null; + List> maxQueue = null; synchronized (this.serverToBufferQueueMap) { for (String key : this.serverToBufferQueueMap.keySet()) { - List curQueue = this.serverToBufferQueueMap.get(key); + List> curQueue = this.serverToBufferQueueMap.get(key); if (curQueue.size() > maxSize) { maxSize = curQueue.size(); maxQueue = curQueue; @@ -1723,7 +1714,8 @@ public class HLogSplitter { } } - private void processWorkItems(String key, List actions) throws IOException { + private void processWorkItems(String key, List> actions) + throws IOException { RegionServerWriter rsw = null; long startTime = System.nanoTime(); @@ -1745,7 +1737,7 @@ public class HLogSplitter { protected boolean flush() throws IOException { String curLoc = null; int curSize = 0; - List curQueue = null; + List> curQueue = null; synchronized (this.serverToBufferQueueMap) { for (String locationKey : this.serverToBufferQueueMap.keySet()) { curQueue = this.serverToBufferQueueMap.get(locationKey); @@ -1820,18 +1812,17 @@ public class HLogSplitter { } } - // close tables - synchronized (this.tableNameToHTableMap) { - for (byte[] tableName : this.tableNameToHTableMap.keySet()) { - HTable htable = this.tableNameToHTableMap.get(tableName); + // close connections + synchronized (this.tableNameToHConnectionMap) { + for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) { + HConnection hconn = this.tableNameToHConnectionMap.get(tableName); try { - htable.close(); + hconn.close(); } catch (IOException ioe) { result.add(ioe); } } } - this.sharedThreadPool.shutdown(); writersClosed = true; } } @@ -1852,18 +1843,6 @@ public class HLogSplitter { return this.recoveredRegions; } - private String getTableFromLocationStr(String loc) { - /** - * location key is in fomrat # - *
- */ - String[] splits = loc.split(KEY_DELIMITER); - if (splits.length != 2) { - return ""; - } - return splits[1]; - } - /** * Get a writer and path for a log starting at the given entry. This function is threadsafe so * long as multiple threads are always acting on different regions. @@ -1876,35 +1855,44 @@ public class HLogSplitter { } String tableName = getTableFromLocationStr(loc); - if (tableName.isEmpty()) { - throw new IOException("Found invalid location string:" + loc); + if(tableName.isEmpty()){ + LOG.warn("Invalid location string:" + loc + " found."); } - HTable table = this.tableNameToHTableMap.get(Bytes.toBytes(tableName)); - + + HConnection hconn = getConnectionByTableName(Bytes.toBytes(tableName)); synchronized (writers) { ret = writers.get(loc); if (ret == null) { - ret = new RegionServerWriter(conf, table); + ret = new RegionServerWriter(conf, Bytes.toBytes(tableName), hconn); writers.put(loc, ret); } } return ret; } - private HTable getHTable(final byte[] table) throws IOException { - HTable htable = this.tableNameToHTableMap.get(table); - if (htable == null) { - synchronized (this.tableNameToHTableMap) { - htable = this.tableNameToHTableMap.get(table); - if (htable == null) { - htable = new HTable(conf, table, this.sharedThreadPool); - // htable.setRegionCachePrefetch(table, true); - htable.setAutoFlush(false, true); - this.tableNameToHTableMap.put(table, htable); + private HConnection getConnectionByTableName(final byte[] tableName) throws IOException { + HConnection hconn = this.tableNameToHConnectionMap.get(tableName); + if (hconn == null) { + synchronized (this.tableNameToHConnectionMap) { + hconn = this.tableNameToHConnectionMap.get(tableName); + if (hconn == null) { + hconn = HConnectionManager.createConnection(conf); + this.tableNameToHConnectionMap.put(tableName, hconn); } } } - return htable; + return hconn; + } + + private String getTableFromLocationStr(String loc) { + /** + * location key is in format #
+ */ + String[] splits = loc.split(KEY_DELIMITER); + if (splits.length != 2) { + return ""; + } + return splits[1]; } } @@ -1915,8 +1903,9 @@ public class HLogSplitter { private final static class RegionServerWriter extends SinkWriter { final WALEditsReplaySink sink; - RegionServerWriter(final Configuration conf, final HTable table) throws IOException { - this.sink = new WALEditsReplaySink(conf, table); + RegionServerWriter(final Configuration conf, final byte[] tableName, final HConnection conn) + throws IOException { + this.sink = new WALEditsReplaySink(conf, tableName, conn); } void close() throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index c4ce6f5..7bebb72 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -12,7 +12,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -20,9 +23,22 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.protobuf.ServiceException; /** * This class is responsible for replaying the edits coming from a failed region server. @@ -36,7 +52,8 @@ public class WALEditsReplaySink { private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class); private final Configuration conf; - private final HTable table; + private final HConnection conn; + private final byte[] tableName; private final MetricsWALEditsReplay metrics; private final AtomicLong totalReplayedEdits = new AtomicLong(); @@ -46,11 +63,12 @@ public class WALEditsReplaySink { * @param table a HTable instance managed by caller * @throws IOException thrown when HDFS goes bad or bad file name */ - public WALEditsReplaySink(Configuration conf, HTable table) + public WALEditsReplaySink(Configuration conf, byte[] tableName, HConnection conn) throws IOException { - this.conf = HBaseConfiguration.create(conf); + this.conf = conf; this.metrics = new MetricsWALEditsReplay(); - this.table = table; + this.conn = conn; + this.tableName = tableName; } /** @@ -58,14 +76,38 @@ public class WALEditsReplaySink { * @param actions * @throws IOException */ - public void replayEntries(List actions) throws IOException { + public void replayEntries(List> actions) throws IOException { if (actions.size() == 0) { return; } - + + Map>> actionsByRegion = + new HashMap>>(); + HRegionLocation loc = null; + Row row = null; + List> regionActions = null; + // Build the action list. + for (int i = 0; i < actions.size(); i++) { + loc = actions.get(i).getFirst(); + row = actions.get(i).getSecond(); + if (actionsByRegion.containsKey(loc.getRegionInfo())) { + regionActions = actionsByRegion.get(loc.getRegionInfo()); + } else { + regionActions = new ArrayList>(); + actionsByRegion.put(loc.getRegionInfo(), regionActions); + } + Action action = new Action(row, i); + regionActions.add(action); + } + try { long startTime = EnvironmentEdgeManager.currentTimeMillis(); - table.batch(actions); + + // replaying edits by region + for (HRegionInfo curRegion : actionsByRegion.keySet()) { + replayEdits(loc, curRegion, actionsByRegion.get(curRegion)); + } + long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime + "(ms)!"); @@ -73,8 +115,8 @@ public class WALEditsReplaySink { * TODO: Add more metricis. */ this.totalReplayedEdits.addAndGet(actions.size()); - } catch (InterruptedException ix) { - throw new InterruptedIOException("Interrupted when replaying wal edits."); + } catch (RuntimeException rx) { + throw new IOException(rx); } } @@ -86,4 +128,27 @@ public class WALEditsReplaySink { return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: " + this.totalReplayedEdits; } + + private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, + final List> actions) + throws IOException, RuntimeException { + new ServerCallable(this.conn, this.tableName, null) { + public MultiResponse call() throws IOException { + try { + AdminProtocol remoteSvr = connection.getAdmin(regionLoc.getServerName()); + MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(), + actions); + remoteSvr.replay(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + return null; + } + + @Override + public void connect(boolean reload) throws IOException { + this.location = connection.locateRegion(regionInfo.getRegionName()); + } + }.withRetries(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java index ea43338..0a71610 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.zookeeper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; /** @@ -56,10 +57,11 @@ public class RecoveringRegionWatcher extends ZooKeeperListener { } String regionName = path.substring(parentPath.length() + 1); - if (!this.server.getRecoveringRegions().remove(regionName)) { - LOG.info("Region:" + regionName + " isn't marked as recovering inside current RS."); - } else { - LOG.info(path + " znode deleted. Region: " + regionName + " completes recovery."); + HRegion region = this.server.getRecoveringRegions().remove(regionName); + if (region != null) { + region.setRecovering(false); } + + LOG.info(path + " znode deleted. Region: " + regionName + " completes recovery."); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index e0b8548..5192387 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; @@ -519,4 +520,17 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer public ExecutorService getExecutorService() { return null; } + + @Override + public MultiResponse replay(RpcController controller, MultiRequest request) + throws ServiceException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map getRecoveringRegions() { + // TODO Auto-generated method stub + return null; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 5a41603..a8ed685 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -296,6 +297,13 @@ public class TestDistributedLogSplitting { private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw, final int numRegions, final int numofLines) throws Exception { + + abortRSAndWaitForRecovery(hrs, zkw, numRegions); + assertEquals(numofLines, TEST_UTIL.countRows(ht)); + } + + private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw, + final int numRegions) throws Exception { final MiniHBaseCluster tmpCluster = this.cluster; // abort RS @@ -327,8 +335,6 @@ public class TestDistributedLogSplitting { return (recoveringRegions != null && recoveringRegions.size() == 0); } }); - - assertEquals(numofLines, TEST_UTIL.countRows(ht)); } @Test(timeout = 300000) @@ -613,6 +619,59 @@ public class TestDistributedLogSplitting { ht.close(); } + @Test(timeout = 300000) + public void testReplayCmd() throws Exception { + LOG.info("testReplayCmd"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1")); + String originalCheckSum = TEST_UTIL.checksumRows(ht); + + // abort RA and trigger replay + abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + assertEquals("Data should remain after reopening of regions", originalCheckSum, + TEST_UTIL.checksumRows(ht)); + + ht.close(); + } + /** * The original intention of this test was to force an abort of a region * server and to make sure that the failure path in the region servers is @@ -911,6 +970,38 @@ public class TestDistributedLogSplitting { } } + /** + * Load table with puts and deletes with expected values so that we can verify later + */ + private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { + t.setAutoFlush(false); + byte[] k = new byte[3]; + + // add puts + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + k[0] = b1; + k[1] = b2; + k[2] = b3; + Put put = new Put(k); + put.add(f, column, k); + t.put(put); + } + } + } + t.flushCommits(); + // add deletes + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + k[0] = 'a'; + k[1] = 'a'; + k[2] = b3; + Delete del = new Delete(k); + t.delete(del); + } + t.flushCommits(); + } + private NavigableSet getAllOnlineRegions(MiniHBaseCluster cluster) throws IOException { NavigableSet online = new TreeSet(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 6bebc4a..aaa9e3d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -191,4 +191,10 @@ public class MockRegionServerServices implements RegionServerServices { public ExecutorService getExecutorService() { return null; } + + @Override + public Map getRecoveringRegions() { + // TODO Auto-generated method stub + return null; + } }