diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java index 44c0616..d19b1d2 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java @@ -63,6 +63,7 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT // smaller enough in order for the replay can complete before ChaosMonkey kills another region // server conf.setInt("hbase.log.replay.retries.number", 2); + conf.setInt("hbase.log.replay.rpc.timeout", 2000); conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); } if(!util.isDistributedCluster()) { diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 2fad51a..e31753a 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -558,6 +558,10 @@ public final class AdminProtos { // optional .GetRegionInfoResponse.CompactionState compaction_state = 2; boolean hasCompactionState(); org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState getCompactionState(); + + // optional bool isRecovering = 3; + boolean hasIsRecovering(); + boolean getIsRecovering(); } public static final class GetRegionInfoResponse extends com.google.protobuf.GeneratedMessage @@ -686,9 +690,20 @@ public final class AdminProtos { return compactionState_; } + // optional bool isRecovering = 3; + public static final int ISRECOVERING_FIELD_NUMBER = 3; + private boolean isRecovering_; + public boolean hasIsRecovering() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public boolean getIsRecovering() { + return isRecovering_; + } + private void initFields() { regionInfo_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance(); compactionState_ = org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState.NONE; + isRecovering_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -716,6 +731,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeEnum(2, compactionState_.getNumber()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(3, isRecovering_); + } getUnknownFields().writeTo(output); } @@ -733,6 +751,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeEnumSize(2, compactionState_.getNumber()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, isRecovering_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -766,6 +788,11 @@ public final class AdminProtos { result = result && (getCompactionState() == other.getCompactionState()); } + result = result && (hasIsRecovering() == other.hasIsRecovering()); + if (hasIsRecovering()) { + result = result && (getIsRecovering() + == other.getIsRecovering()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -783,6 +810,10 @@ public final class AdminProtos { hash = (37 * hash) + COMPACTION_STATE_FIELD_NUMBER; hash = (53 * hash) + hashEnum(getCompactionState()); } + if (hasIsRecovering()) { + hash = (37 * hash) + ISRECOVERING_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsRecovering()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -908,6 +939,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); compactionState_ = org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState.NONE; bitField0_ = (bitField0_ & ~0x00000002); + isRecovering_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -958,6 +991,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000002; } result.compactionState_ = compactionState_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.isRecovering_ = isRecovering_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -980,6 +1017,9 @@ public final class AdminProtos { if (other.hasCompactionState()) { setCompactionState(other.getCompactionState()); } + if (other.hasIsRecovering()) { + setIsRecovering(other.getIsRecovering()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1039,6 +1079,11 @@ public final class AdminProtos { } break; } + case 24: { + bitField0_ |= 0x00000004; + isRecovering_ = input.readBool(); + break; + } } } } @@ -1159,6 +1204,27 @@ public final class AdminProtos { return this; } + // optional bool isRecovering = 3; + private boolean isRecovering_ ; + public boolean hasIsRecovering() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public boolean getIsRecovering() { + return isRecovering_; + } + public Builder setIsRecovering(boolean value) { + bitField0_ |= 0x00000004; + isRecovering_ = value; + onChanged(); + return this; + } + public Builder clearIsRecovering() { + bitField0_ = (bitField0_ & ~0x00000004); + isRecovering_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:GetRegionInfoResponse) } @@ -15389,78 +15455,79 @@ public final class AdminProtos { "\n\013Admin.proto\032\014Client.proto\032\013hbase.proto" + "\032\tWAL.proto\"R\n\024GetRegionInfoRequest\022 \n\006r" + "egion\030\001 \002(\0132\020.RegionSpecifier\022\030\n\020compact" + - "ion_state\030\002 \001(\010\"\303\001\n\025GetRegionInfoRespons" + + "ion_state\030\002 \001(\010\"\331\001\n\025GetRegionInfoRespons" + "e\022 \n\013region_info\030\001 \002(\0132\013.RegionInfo\022@\n\020c" + "ompaction_state\030\002 \001(\0162&.GetRegionInfoRes" + - "ponse.CompactionState\"F\n\017CompactionState" + - "\022\010\n\004NONE\020\000\022\t\n\005MINOR\020\001\022\t\n\005MAJOR\020\002\022\023\n\017MAJO" + - "R_AND_MINOR\020\003\"G\n\023GetStoreFileRequest\022 \n\006" + - "region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006family", - "\030\002 \003(\014\"*\n\024GetStoreFileResponse\022\022\n\nstore_" + - "file\030\001 \003(\t\"\030\n\026GetOnlineRegionRequest\";\n\027" + - "GetOnlineRegionResponse\022 \n\013region_info\030\001" + - " \003(\0132\013.RegionInfo\"\275\001\n\021OpenRegionRequest\022" + - "4\n\topen_info\030\001 \003(\0132!.OpenRegionRequest.R" + - "egionOpenInfo\032r\n\016RegionOpenInfo\022\033\n\006regio" + - "n\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_of_offli" + - "ne_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 \003(\0132\013.S" + - "erverName\"\235\001\n\022OpenRegionResponse\022=\n\ropen" + - "ing_state\030\001 \003(\0162&.OpenRegionResponse.Reg", - "ionOpeningState\"H\n\022RegionOpeningState\022\n\n" + - "\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FAILED_" + - "OPENING\020\002\"\240\001\n\022CloseRegionRequest\022 \n\006regi" + - "on\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027version_of" + - "_closing_node\030\002 \001(\r\022\036\n\020transition_in_ZK\030" + - "\003 \001(\010:\004true\022\'\n\022destination_server\030\004 \001(\0132" + - "\013.ServerName\"%\n\023CloseRegionResponse\022\016\n\006c" + - "losed\030\001 \002(\010\"P\n\022FlushRegionRequest\022 \n\006reg" + - "ion\030\001 \002(\0132\020.RegionSpecifier\022\030\n\020if_older_" + - "than_ts\030\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017", - "last_flush_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K" + - "\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020.R" + - "egionSpecifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023S" + - "plitRegionResponse\"W\n\024CompactRegionReque" + - "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005" + - "major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRe" + - "gionResponse\"v\n\023MergeRegionsRequest\022\"\n\010r" + - "egion_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010regio" + - "n_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcible\030" + - "\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"X\n\010", - "WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_va" + - "lue_bytes\030\002 \003(\014\022\035\n\025associated_cell_count" + - "\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n\005en" + - "try\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALEntr" + - "yResponse\"\026\n\024RollWALWriterRequest\"0\n\025Rol" + - "lWALWriterResponse\022\027\n\017region_to_flush\030\001 " + - "\003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 \002(\t" + - "\"\024\n\022StopServerResponse\"\026\n\024GetServerInfoR" + - "equest\"B\n\nServerInfo\022 \n\013server_name\030\001 \002(" + - "\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025Ge", - "tServerInfoResponse\022 \n\013server_info\030\001 \002(\013" + - "2\013.ServerInfo2\337\006\n\014AdminService\022>\n\rGetReg" + - "ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" + - "onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" + - "eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" + - "etOnlineRegion\022\027.GetOnlineRegionRequest\032" + - "\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" + - "\022.OpenRegionRequest\032\023.OpenRegionResponse" + - "\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" + - "loseRegionResponse\0228\n\013FlushRegion\022\023.Flus", - "hRegionRequest\032\024.FlushRegionResponse\0228\n\013" + - "SplitRegion\022\023.SplitRegionRequest\032\024.Split" + - "RegionResponse\022>\n\rCompactRegion\022\025.Compac" + - "tRegionRequest\032\026.CompactRegionResponse\022;" + - "\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" + - "ergeRegionsResponse\022J\n\021ReplicateWALEntry" + - "\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" + - "ALEntryResponse\022\'\n\006Replay\022\r.MultiRequest" + - "\032\016.MultiResponse\022>\n\rRollWALWriter\022\025.Roll" + - "WALWriterRequest\032\026.RollWALWriterResponse", - "\022>\n\rGetServerInfo\022\025.GetServerInfoRequest" + - "\032\026.GetServerInfoResponse\0225\n\nStopServer\022\022" + - ".StopServerRequest\032\023.StopServerResponseB" + - "A\n*org.apache.hadoop.hbase.protobuf.gene" + - "ratedB\013AdminProtosH\001\210\001\001\240\001\001" + "ponse.CompactionState\022\024\n\014isRecovering\030\003 " + + "\001(\010\"F\n\017CompactionState\022\010\n\004NONE\020\000\022\t\n\005MINO" + + "R\020\001\022\t\n\005MAJOR\020\002\022\023\n\017MAJOR_AND_MINOR\020\003\"G\n\023G" + + "etStoreFileRequest\022 \n\006region\030\001 \002(\0132\020.Reg", + "ionSpecifier\022\016\n\006family\030\002 \003(\014\"*\n\024GetStore" + + "FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" + + "nlineRegionRequest\";\n\027GetOnlineRegionRes" + + "ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" + + "\275\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + + "2!.OpenRegionRequest.RegionOpenInfo\032r\n\016R" + + "egionOpenInfo\022\033\n\006region\030\001 \002(\0132\013.RegionIn" + + "fo\022\037\n\027version_of_offline_node\030\002 \001(\r\022\"\n\rf" + + "avored_nodes\030\003 \003(\0132\013.ServerName\"\235\001\n\022Open" + + "RegionResponse\022=\n\ropening_state\030\001 \003(\0162&.", + "OpenRegionResponse.RegionOpeningState\"H\n" + + "\022RegionOpeningState\022\n\n\006OPENED\020\000\022\022\n\016ALREA" + + "DY_OPENED\020\001\022\022\n\016FAILED_OPENING\020\002\"\240\001\n\022Clos" + + "eRegionRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" + + "pecifier\022\037\n\027version_of_closing_node\030\002 \001(" + + "\r\022\036\n\020transition_in_ZK\030\003 \001(\010:\004true\022\'\n\022des" + + "tination_server\030\004 \001(\0132\013.ServerName\"%\n\023Cl" + + "oseRegionResponse\022\016\n\006closed\030\001 \002(\010\"P\n\022Flu" + + "shRegionRequest\022 \n\006region\030\001 \002(\0132\020.Region" + + "Specifier\022\030\n\020if_older_than_ts\030\002 \001(\004\"?\n\023F", + "lushRegionResponse\022\027\n\017last_flush_time\030\001 " + + "\002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitRegionReque" + + "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013" + + "split_point\030\002 \001(\014\"\025\n\023SplitRegionResponse" + + "\"W\n\024CompactRegionRequest\022 \n\006region\030\001 \002(\013" + + "2\020.RegionSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006fam" + + "ily\030\003 \001(\014\"\027\n\025CompactRegionResponse\"v\n\023Me" + + "rgeRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.Region" + + "Specifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Me", + "rgeRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 " + + "\002(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n" + + "\025associated_cell_count\030\003 \001(\005\"4\n\030Replicat" + + "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" + + "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" + + "ALWriterRequest\"0\n\025RollWALWriterResponse" + + "\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerR" + + "equest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerResp" + + "onse\"\026\n\024GetServerInfoRequest\"B\n\nServerIn" + + "fo\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\n", + "webui_port\030\002 \001(\r\"9\n\025GetServerInfoRespons" + + "e\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\337\006\n\014" + + "AdminService\022>\n\rGetRegionInfo\022\025.GetRegio" + + "nInfoRequest\032\026.GetRegionInfoResponse\022;\n\014" + + "GetStoreFile\022\024.GetStoreFileRequest\032\025.Get" + + "StoreFileResponse\022D\n\017GetOnlineRegion\022\027.G" + + "etOnlineRegionRequest\032\030.GetOnlineRegionR" + + "esponse\0225\n\nOpenRegion\022\022.OpenRegionReques" + + "t\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023." + + "CloseRegionRequest\032\024.CloseRegionResponse", + "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" + + "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" + + "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" + + "CompactRegion\022\025.CompactRegionRequest\032\026.C" + + "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" + + "ergeRegionsRequest\032\025.MergeRegionsRespons" + + "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" + + "ryRequest\032\032.ReplicateWALEntryResponse\022\'\n" + + "\006Replay\022\r.MultiRequest\032\016.MultiResponse\022>" + + "\n\rRollWALWriter\022\025.RollWALWriterRequest\032\026", + ".RollWALWriterResponse\022>\n\rGetServerInfo\022" + + "\025.GetServerInfoRequest\032\026.GetServerInfoRe" + + "sponse\0225\n\nStopServer\022\022.StopServerRequest" + + "\032\023.StopServerResponseBA\n*org.apache.hado" + + "op.hbase.protobuf.generatedB\013AdminProtos" + + "H\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15480,7 +15547,7 @@ public final class AdminProtos { internal_static_GetRegionInfoResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GetRegionInfoResponse_descriptor, - new java.lang.String[] { "RegionInfo", "CompactionState", }, + new java.lang.String[] { "RegionInfo", "CompactionState", "IsRecovering", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.Builder.class); internal_static_GetStoreFileRequest_descriptor = diff --git hbase-protocol/src/main/protobuf/Admin.proto hbase-protocol/src/main/protobuf/Admin.proto index 8d0398f..7311314 100644 --- hbase-protocol/src/main/protobuf/Admin.proto +++ hbase-protocol/src/main/protobuf/Admin.proto @@ -36,6 +36,7 @@ message GetRegionInfoRequest { message GetRegionInfoResponse { required RegionInfo region_info = 1; optional CompactionState compaction_state = 2; + optional bool isRecovering = 3; enum CompactionState { NONE = 0; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index fcc99ca..9ec88c0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -337,6 +337,30 @@ public class MasterFileSystem { /** * Mark regions in recovering state when distributedLogReplay are set true + * @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained + * in them + * @throws IOException + */ + public void prepareLogReplay(Set serverNames) throws IOException { + if (!this.distributedLogReplay) { + return; + } + // mark regions in recovering state + for (ServerName serverName : serverNames) { + NavigableMap regions = this.getServerUserRegions(serverName); + if (regions == null) { + continue; + } + try { + this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet()); + } catch (KeeperException e) { + throw new IOException(e); + } + } + } + + /** + * Mark regions in recovering state when distributedLogReplay are set true * @param serverName Failed region server whose wals to be replayed * @param regions Set of regions to be recovered * @throws IOException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java index 8762092..cc74096 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java @@ -55,9 +55,15 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { try { AssignmentManager am = this.services.getAssignmentManager(); try { - if (this.shouldSplitHlog && !this.distributedLogReplay) { + if (this.shouldSplitHlog) { LOG.info("Splitting META logs for " + serverName); - this.services.getMasterFileSystem().splitMetaLog(serverName); + if (this.distributedLogReplay) { + Set regions = new HashSet(); + regions.add(HRegionInfo.FIRST_META_REGIONINFO); + this.services.getMasterFileSystem().prepareLogReplay(serverName, regions); + } else { + this.services.getMasterFileSystem().splitMetaLog(serverName); + } } } catch (IOException ioe) { this.services.getExecutorService().submit(this); @@ -155,21 +161,6 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { long waitTime = this.server.getConfiguration().getLong( "hbase.catalog.verification.timeout", 1000); - if (this.shouldSplitHlog && this.distributedLogReplay) { - LOG.info("Splitting META logs for " + serverName - + ". Mark META region in recovery before assignment."); - Set regions = new HashSet(); - regions.add(HRegionInfo.FIRST_META_REGIONINFO); - try { - this.services.getMasterFileSystem().prepareLogReplay(serverName, regions); - } catch (IOException ioe) { - this.services.getExecutorService().submit(this); - this.deadServers.add(serverName); - throw new IOException("failed to mark META region in recovery on " + serverName - + ", will retry", ioe); - } - } - int iFlag = 0; while (true) { try { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 5187c01..548f2f0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -182,9 +182,16 @@ public class ServerShutdownHandler extends EventHandler { } try { - if (this.shouldSplitHlog && !this.distributedLogReplay) { + if (this.shouldSplitHlog) { LOG.info("Splitting logs for " + serverName + " before assignment."); - this.services.getMasterFileSystem().splitLog(serverName); + if (this.distributedLogReplay) { + LOG.info("Mark regions in recovery before assignment."); + Set serverNames = new HashSet(); + serverNames.add(serverName); + this.services.getMasterFileSystem().prepareLogReplay(serverNames); + } else { + this.services.getMasterFileSystem().splitLog(serverName); + } } else { LOG.info("Skipping log splitting for " + serverName); } @@ -259,18 +266,6 @@ public class ServerShutdownHandler extends EventHandler { } } } - - if (this.shouldSplitHlog && this.distributedLogReplay) { - try { - LOG.info("Splitting logs for " + serverName - + ". Mark regions in recovery before assignment."); - Set toAssignRegionSet = new HashSet(); - toAssignRegionSet.addAll(toAssignRegions); - this.services.getMasterFileSystem().prepareLogReplay(serverName, toAssignRegionSet); - } catch (IOException ioe) { - resubmit(serverName, ioe); - } - } try { am.assign(toAssignRegions); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9e3f7c6..b9af631 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -213,7 +213,7 @@ public class HRegion implements HeapSize { // , Writable{ */ protected enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE + REPLAY_BATCH_MUTATE, COMPACT_REGION } ////////////////////////////////////////////////////////////////////////////// @@ -5287,6 +5287,7 @@ public class HRegion implements HeapSize { // , Writable{ case PUT: case DELETE: case BATCH_MUTATE: + case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is allowed if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { @@ -5296,8 +5297,10 @@ public class HRegion implements HeapSize { // , Writable{ default: break; } - if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) { - // split or merge region doesn't need to check the closing/closed state or lock the region + if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION + || op == Operation.COMPACT_REGION) { + // split, merge or compact region doesn't need to check the closing/closed state or lock the + // region return; } if (this.closing.get()) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 423a1fd..5015ade 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3368,6 +3368,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (request.hasCompactionState() && request.getCompactionState()) { builder.setCompactionState(region.getCompactionState()); } + builder.setIsRecovering(region.isRecovering()); return builder.build(); } catch (IOException ie) { throw new ServiceException(ie); @@ -3743,6 +3744,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa checkOpen(); requestCount.increment(); HRegion region = getRegion(request.getRegion()); + region.startRegionOperation(Operation.COMPACT_REGION); LOG.info("Compacting " + region.getRegionNameAsString()); boolean major = false; byte [] family = null; @@ -4028,7 +4030,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mArray); + OperationStatus codes[] = region.batchMutate(mArray, isReplay); for (i = 0; i < codes.length; i++) { switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index a3a292a..e473f2f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -44,6 +44,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -74,6 +75,9 @@ import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; @@ -97,6 +101,7 @@ import org.apache.zookeeper.KeeperException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -1541,43 +1546,37 @@ public class HLogSplitter { } Long lastFlushedSequenceId = -1l; - loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut); - Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo() - .getEncodedName()); - - // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will - // update the value for the region - RegionStoreSequenceIds ids = - SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc.getRegionInfo() - .getEncodedName()); - if(ids != null) { - lastFlushedSequenceId = ids.getLastFlushedSequenceId(); - Map storeIds = new TreeMap(Bytes.BYTES_COMPARATOR); - List maxSeqIdInStores = ids.getStoreSequenceIdList(); - for (StoreSequenceId id : maxSeqIdInStores) { - storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); + AtomicBoolean isRecovering = new AtomicBoolean(true); + loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); + if (!isRecovering.get()) { + // region isn't in recovering at all because WAL file may contain a region that has + // been moved to somewhere before hosting RS fails + lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); + LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + + " because it's not in recovering."); + } else { + Long cachedLastFlushedSequenceId = + lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); + + // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will + // update the value for the region + RegionStoreSequenceIds ids = + SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc + .getRegionInfo().getEncodedName()); + if (ids != null) { + lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + Map storeIds = new TreeMap(Bytes.BYTES_COMPARATOR); + List maxSeqIdInStores = ids.getStoreSequenceIdList(); + for (StoreSequenceId id : maxSeqIdInStores) { + storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); + } + regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } - regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); - } - - if (cachedLastFlushedSequenceId == null - || lastFlushedSequenceId > cachedLastFlushedSequenceId) { - lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); - } - // check if the region to be recovered is marked as recovering in ZK - try { - if (SplitLogManager.isRegionMarkedRecoveringInZK(watcher, loc.getRegionInfo() - .getEncodedName()) == false) { - // region isn't in recovering at all because WAL file may contain a region that has - // been moved to somewhere before hosting RS fails - lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); - LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() - + " because it's not in recovering."); + if (cachedLastFlushedSequenceId == null + || lastFlushedSequenceId > cachedLastFlushedSequenceId) { + lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } - } catch (KeeperException e) { - throw new IOException("Failed to retrieve recovering state of region " - + loc.getRegionInfo().getEncodedName(), e); } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); @@ -1608,11 +1607,12 @@ public class HLogSplitter { * @param loc * @param row * @param timeout How long to wait + * @param isRecovering Recovering state of the region interested on destination region server. * @return True when region is online on the destination region server * @throws InterruptedException */ private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row, - final long timeout) + final long timeout, AtomicBoolean isRecovering) throws IOException { final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout; final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -1630,8 +1630,16 @@ public class HLogSplitter { } BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName()); HRegionInfo region = loc.getRegionInfo(); - if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) { - return loc; + try { + GetRegionInfoRequest request = + RequestConverter.buildGetRegionInfoRequest(region.getRegionName()); + GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request); + if (HRegionInfo.convert(response.getRegionInfo()) != null) { + isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true); + return loc; + } + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } } catch (IOException e) { cause = e.getCause(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index d308e3d..9c05ba4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -634,7 +634,7 @@ public class TestDistributedLogSplitting { @Test(timeout = 300000) public void testLogReplayForDisablingTable() throws Exception { - LOG.info("testLogReplayWithNonMetaRSDown"); + LOG.info("testLogReplayForDisablingTable"); Configuration curConf = HBaseConfiguration.create(); curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS, curConf);