From 192b3cae39b148517d136dd5131f6bf5fce02da5 Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Fri, 5 Sep 2014 01:33:50 +0800 Subject: [PATCH] HBASE-10576 Custom load balancer to co-locate the regions of two tables which are having same split keys --- .../java/org/apache/hadoop/hbase/HRegionInfo.java | 56 ++- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 16 + .../apache/hadoop/hbase/master/RegionState.java | 32 +- .../protobuf/generated/ClusterStatusProtos.java | 110 ++++-- .../hbase/protobuf/generated/HBaseProtos.java | 163 +++++++-- .../generated/RegionServerStatusProtos.java | 80 +++-- .../src/main/protobuf/ClusterStatus.proto | 2 + hbase-protocol/src/main/protobuf/HBase.proto | 1 + .../src/main/protobuf/RegionServerStatus.proto | 5 + .../hadoop/hbase/master/AssignmentManager.java | 67 +++- .../hadoop/hbase/master/RegionStateStore.java | 4 + .../apache/hadoop/hbase/master/RegionStates.java | 12 + .../master/handler/ServerShutdownHandler.java | 1 + .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/RegionShadowRequest.java | 132 ++++++++ .../regionserver/RegionShadowTransaction.java | 376 +++++++++++++++++++++ .../regionserver/handler/OpenRegionHandler.java | 2 +- .../TestRegionShadowTranscationOnCluster.java | 182 ++++++++++ 18 files changed, 1127 insertions(+), 116 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowRequest.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowTransaction.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionShadowTranscationOnCluster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 5518f3a..183a81f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -219,6 +219,8 @@ public class HRegionInfo implements Comparable { private String encodedName = null; private byte [] encodedNameAsBytes = null; private int replicaId = DEFAULT_REPLICA_ID; + private boolean shadow = false; + // Current TableName private TableName tableName = null; @@ -238,6 +240,7 @@ public class HRegionInfo implements Comparable { result ^= Boolean.valueOf(this.offLine).hashCode(); result ^= Arrays.hashCode(this.tableName.getName()); result ^= this.replicaId; + result ^= Boolean.valueOf(this.isShadow()).hashCode(); this.hashCode = result; } @@ -329,7 +332,27 @@ public class HRegionInfo implements Comparable { public HRegionInfo(final TableName tableName, final byte[] startKey, final byte[] endKey, final boolean split, final long regionid, final int replicaId) - throws IllegalArgumentException { + throws IllegalArgumentException { + this(tableName, startKey, endKey, split, regionid, replicaId, false); + } + + /** + * Construct HRegionInfo with explicit parameters + * + * @param tableName the table descriptor + * @param startKey first key in region + * @param endKey end of key range + * @param split true if this region has split and we have daughter regions + * regions that may or may not hold references to this region. + * @param regionid Region id to use. + * @param replicaId the replicaId to use + * @param shadow true if this region need to skip assign from master. + * @throws IllegalArgumentException + */ + public HRegionInfo(final TableName tableName, final byte[] startKey, + final byte[] endKey, final boolean split, final long regionid, + final int replicaId, final boolean shadow) + throws IllegalArgumentException { super(); if (tableName == null) { throw new IllegalArgumentException("TableName cannot be null"); @@ -345,9 +368,9 @@ public class HRegionInfo implements Comparable { this.regionName = createRegionName(this.tableName, startKey, regionId, replicaId, true); this.split = split; - this.endKey = endKey == null? HConstants.EMPTY_END_ROW: endKey.clone(); - this.startKey = startKey == null? - HConstants.EMPTY_START_ROW: startKey.clone(); + this.shadow = shadow; + this.endKey = endKey == null ? HConstants.EMPTY_END_ROW : endKey.clone(); + this.startKey = startKey == null ? HConstants.EMPTY_START_ROW : startKey.clone(); this.tableName = tableName; setHashCode(); } @@ -364,6 +387,7 @@ public class HRegionInfo implements Comparable { this.regionId = other.getRegionId(); this.regionName = other.getRegionName(); this.split = other.isSplit(); + this.shadow = other.isShadow(); this.startKey = other.getStartKey(); this.hashCode = other.hashCode(); this.encodedName = other.getEncodedName(); @@ -801,7 +825,8 @@ public class HRegionInfo implements Comparable { Bytes.toStringBinary(this.endKey) + "'" + (isOffline()? ", OFFLINE => true": "") + (isSplit()? ", SPLIT => true": "") + - ((replicaId > 0)? ", REPLICA_ID => " + replicaId : "") + "}"; + ((replicaId > 0)? ", REPLICA_ID => " + replicaId : "") + + (isShadow()? ", SHADOW => true": "") +"}"; } /** @@ -957,8 +982,12 @@ public class HRegionInfo implements Comparable { int replicaDiff = this.getReplicaId() - o.getReplicaId(); if (replicaDiff != 0) return replicaDiff; - if (this.offLine == o.offLine) - return 0; + if (this.offLine == o.offLine) { + if(this.shadow == o.shadow) return 0; + if (this.shadow == true) return -1; + return 1; + } + if (this.offLine == true) return -1; return 1; @@ -1000,6 +1029,7 @@ public class HRegionInfo implements Comparable { } builder.setOffline(info.isOffline()); builder.setSplit(info.isSplit()); + builder.setShadow(info.isShadow()); builder.setReplicaId(info.getReplicaId()); return builder.build(); } @@ -1031,6 +1061,7 @@ public class HRegionInfo implements Comparable { if (proto.hasSplit()) { split = proto.getSplit(); } + HRegionInfo hri = new HRegionInfo( tableName, startKey, @@ -1038,6 +1069,9 @@ public class HRegionInfo implements Comparable { if (proto.hasOffline()) { hri.setOffline(proto.getOffline()); } + if (proto.hasShadow()) { + hri.setShadow(proto.getShadow()); + } return hri; } @@ -1452,4 +1486,12 @@ public class HRegionInfo implements Comparable { } return false; } + + public boolean isShadow() { + return shadow; + } + + public void setShadow(boolean shadow) { + this.shadow = shadow; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index c63e4c6..54d01e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1232,6 +1232,22 @@ public class MetaTableAccessor { } } + public static void shadowRegion(final HConnection hConnection, HRegionInfo region, ServerName sn) + throws IOException { + HTable meta = getMetaHTable(hConnection); + try { + HRegionInfo copyOfParent = new HRegionInfo(region); + copyOfParent.setShadow(true); + + // Put for parent + Put putParent = makePutFromRegionInfo(copyOfParent); + byte[] tableRow = Bytes.toBytes(region.getRegionNameAsString() + HConstants.DELIMITER); + multiMutate(meta, tableRow, putParent); + } finally { + meta.close(); + } + } + /** * Performs an atomic multi-Mutate operation against the given table. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java index 0a9c123..a15ffc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java @@ -51,9 +51,12 @@ public class RegionState { SPLITTING_NEW, // new region to be created when RS splits a parent // region but hasn't be created yet, or master doesn't // know it's already created - MERGING_NEW; // new region to be created when RS merges two + MERGING_NEW, // new region to be created when RS merges two // daughter regions but hasn't be created yet, or // master doesn't know it's already created + SHADOWING, // server started shadow of a region. + SHADOWED; // server completed shadow of a region. + /** * Convert to protobuf ClusterStatusProtos.RegionState.State @@ -106,6 +109,12 @@ public class RegionState { case MERGING_NEW: rs = ClusterStatusProtos.RegionState.State.MERGING_NEW; break; + case SHADOWING: + rs = ClusterStatusProtos.RegionState.State.SHADOWING; + break; + case SHADOWED: + rs = ClusterStatusProtos.RegionState.State.SHADOWED; + break; default: throw new IllegalStateException(""); } @@ -165,6 +174,13 @@ public class RegionState { case MERGING_NEW: state = MERGING_NEW; break; + case SHADOWING: + state = SHADOWING; + break; + case SHADOWED: + state = SHADOWED; + break; + default: throw new IllegalStateException(""); } @@ -269,6 +285,14 @@ public class RegionState { public boolean isMergingNew() { return state == State.MERGING_NEW; } + + public boolean isShadowing() { + return state == State.SHADOWING; + } + + public boolean isShadowed() { + return state == State.SHADOWED; + } public boolean isOpenOrMergingOnServer(final ServerName sn) { return isOnServer(sn) && (isOpened() || isMerging()); @@ -278,6 +302,10 @@ public class RegionState { return isOnServer(sn) && (isOpened() || isMergingNew()); } + public boolean isOpenOrShadowingOnServer(final ServerName sn) { + return isOnServer(sn) && (isOpened() || isShadowing()); + } + public boolean isOpenOrSplittingOnServer(final ServerName sn) { return isOnServer(sn) && (isOpened() || isSplitting()); } @@ -336,7 +364,7 @@ public class RegionState { * can't transition to pending_close/closing (unassign/offline) */ public static boolean isUnassignable(State state) { - return state == State.MERGED || state == State.SPLIT || state == State.OFFLINE + return state == State.MERGED || state == State.SPLIT || state == State.SHADOWED || state == State.OFFLINE || state == State.SPLITTING_NEW || state == State.MERGING_NEW; } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java index c558485..913f8b4 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java @@ -290,6 +290,23 @@ public final class ClusterStatusProtos { * */ MERGING_NEW(14, 14), + /** + * SHADOWING = 16; + * + *
+       * daughter regions but hasn't be created yet, or
+       * master doesn't know it's already created
+       * 
+ */ + SHADOWING(15, 16), + /** + * SHADOWED = 17; + * + *
+       * server completed shadow of a region
+       * 
+ */ + SHADOWED(16, 17), ; /** @@ -413,6 +430,23 @@ public final class ClusterStatusProtos { * */ public static final int MERGING_NEW_VALUE = 14; + /** + * SHADOWING = 16; + * + *
+       * daughter regions but hasn't be created yet, or
+       * master doesn't know it's already created
+       * 
+ */ + public static final int SHADOWING_VALUE = 16; + /** + * SHADOWED = 17; + * + *
+       * server completed shadow of a region
+       * 
+ */ + public static final int SHADOWED_VALUE = 17; public final int getNumber() { return value; } @@ -434,6 +468,8 @@ public final class ClusterStatusProtos { case 12: return MERGED; case 13: return SPLITTING_NEW; case 14: return MERGING_NEW; + case 16: return SHADOWING; + case 17: return SHADOWED; default: return null; } } @@ -10339,49 +10375,49 @@ public final class ClusterStatusProtos { static { java.lang.String[] descriptorData = { "\n\023ClusterStatus.proto\032\013HBase.proto\032\017Clus" + - "terId.proto\032\010FS.proto\"\307\002\n\013RegionState\022 \n" + + "terId.proto\032\010FS.proto\"\344\002\n\013RegionState\022 \n" + "\013region_info\030\001 \002(\0132\013.RegionInfo\022!\n\005state" + "\030\002 \002(\0162\022.RegionState.State\022\r\n\005stamp\030\003 \001(" + - "\004\"\343\001\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN" + + "\004\"\200\002\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN" + "\020\001\022\013\n\007OPENING\020\002\022\010\n\004OPEN\020\003\022\021\n\rPENDING_CLO" + "SE\020\004\022\013\n\007CLOSING\020\005\022\n\n\006CLOSED\020\006\022\r\n\tSPLITTI" + "NG\020\007\022\t\n\005SPLIT\020\010\022\017\n\013FAILED_OPEN\020\t\022\020\n\014FAIL" + "ED_CLOSE\020\n\022\013\n\007MERGING\020\013\022\n\n\006MERGED\020\014\022\021\n\rS" + - "PLITTING_NEW\020\r\022\017\n\013MERGING_NEW\020\016\"X\n\022Regio", - "nInTransition\022\036\n\004spec\030\001 \002(\0132\020.RegionSpec" + - "ifier\022\"\n\014region_state\030\002 \002(\0132\014.RegionStat" + - "e\"\320\003\n\nRegionLoad\022*\n\020region_specifier\030\001 \002" + - "(\0132\020.RegionSpecifier\022\016\n\006stores\030\002 \001(\r\022\022\n\n" + - "storefiles\030\003 \001(\r\022\"\n\032store_uncompressed_s" + - "ize_MB\030\004 \001(\r\022\031\n\021storefile_size_MB\030\005 \001(\r\022" + - "\030\n\020memstore_size_MB\030\006 \001(\r\022\037\n\027storefile_i" + - "ndex_size_MB\030\007 \001(\r\022\033\n\023read_requests_coun" + - "t\030\010 \001(\004\022\034\n\024write_requests_count\030\t \001(\004\022\034\n" + - "\024total_compacting_KVs\030\n \001(\004\022\035\n\025current_c", - "ompacted_KVs\030\013 \001(\004\022\032\n\022root_index_size_KB" + - "\030\014 \001(\r\022\"\n\032total_static_index_size_KB\030\r \001" + - "(\r\022\"\n\032total_static_bloom_size_KB\030\016 \001(\r\022\034" + - "\n\024complete_sequence_id\030\017 \001(\004\"\212\002\n\nServerL" + - "oad\022\032\n\022number_of_requests\030\001 \001(\r\022 \n\030total" + - "_number_of_requests\030\002 \001(\r\022\024\n\014used_heap_M" + - "B\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022!\n\014region_l" + - "oads\030\005 \003(\0132\013.RegionLoad\022\"\n\014coprocessors\030" + - "\006 \003(\0132\014.Coprocessor\022\031\n\021report_start_time" + - "\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004\022\030\n\020info_", - "server_port\030\t \001(\r\"O\n\016LiveServerInfo\022\033\n\006s" + - "erver\030\001 \002(\0132\013.ServerName\022 \n\013server_load\030" + - "\002 \002(\0132\013.ServerLoad\"\340\002\n\rClusterStatus\022/\n\r" + - "hbase_version\030\001 \001(\0132\030.HBaseVersionFileCo" + - "ntent\022%\n\014live_servers\030\002 \003(\0132\017.LiveServer" + - "Info\022!\n\014dead_servers\030\003 \003(\0132\013.ServerName\022" + - "2\n\025regions_in_transition\030\004 \003(\0132\023.RegionI" + - "nTransition\022\036\n\ncluster_id\030\005 \001(\0132\n.Cluste" + - "rId\022)\n\023master_coprocessors\030\006 \003(\0132\014.Copro" + - "cessor\022\033\n\006master\030\007 \001(\0132\013.ServerName\022#\n\016b", - "ackup_masters\030\010 \003(\0132\013.ServerName\022\023\n\013bala" + - "ncer_on\030\t \001(\010BF\n*org.apache.hadoop.hbase" + - ".protobuf.generatedB\023ClusterStatusProtos" + - "H\001\240\001\001" + "PLITTING_NEW\020\r\022\017\n\013MERGING_NEW\020\016\022\r\n\tSHADO", + "WING\020\020\022\014\n\010SHADOWED\020\021\"X\n\022RegionInTransiti" + + "on\022\036\n\004spec\030\001 \002(\0132\020.RegionSpecifier\022\"\n\014re" + + "gion_state\030\002 \002(\0132\014.RegionState\"\320\003\n\nRegio" + + "nLoad\022*\n\020region_specifier\030\001 \002(\0132\020.Region" + + "Specifier\022\016\n\006stores\030\002 \001(\r\022\022\n\nstorefiles\030" + + "\003 \001(\r\022\"\n\032store_uncompressed_size_MB\030\004 \001(" + + "\r\022\031\n\021storefile_size_MB\030\005 \001(\r\022\030\n\020memstore" + + "_size_MB\030\006 \001(\r\022\037\n\027storefile_index_size_M" + + "B\030\007 \001(\r\022\033\n\023read_requests_count\030\010 \001(\004\022\034\n\024" + + "write_requests_count\030\t \001(\004\022\034\n\024total_comp", + "acting_KVs\030\n \001(\004\022\035\n\025current_compacted_KV" + + "s\030\013 \001(\004\022\032\n\022root_index_size_KB\030\014 \001(\r\022\"\n\032t" + + "otal_static_index_size_KB\030\r \001(\r\022\"\n\032total" + + "_static_bloom_size_KB\030\016 \001(\r\022\034\n\024complete_" + + "sequence_id\030\017 \001(\004\"\212\002\n\nServerLoad\022\032\n\022numb" + + "er_of_requests\030\001 \001(\r\022 \n\030total_number_of_" + + "requests\030\002 \001(\r\022\024\n\014used_heap_MB\030\003 \001(\r\022\023\n\013" + + "max_heap_MB\030\004 \001(\r\022!\n\014region_loads\030\005 \003(\0132" + + "\013.RegionLoad\022\"\n\014coprocessors\030\006 \003(\0132\014.Cop" + + "rocessor\022\031\n\021report_start_time\030\007 \001(\004\022\027\n\017r", + "eport_end_time\030\010 \001(\004\022\030\n\020info_server_port" + + "\030\t \001(\r\"O\n\016LiveServerInfo\022\033\n\006server\030\001 \002(\013" + + "2\013.ServerName\022 \n\013server_load\030\002 \002(\0132\013.Ser" + + "verLoad\"\340\002\n\rClusterStatus\022/\n\rhbase_versi" + + "on\030\001 \001(\0132\030.HBaseVersionFileContent\022%\n\014li" + + "ve_servers\030\002 \003(\0132\017.LiveServerInfo\022!\n\014dea" + + "d_servers\030\003 \003(\0132\013.ServerName\0222\n\025regions_" + + "in_transition\030\004 \003(\0132\023.RegionInTransition" + + "\022\036\n\ncluster_id\030\005 \001(\0132\n.ClusterId\022)\n\023mast" + + "er_coprocessors\030\006 \003(\0132\014.Coprocessor\022\033\n\006m", + "aster\030\007 \001(\0132\013.ServerName\022#\n\016backup_maste" + + "rs\030\010 \003(\0132\013.ServerName\022\023\n\013balancer_on\030\t \001" + + "(\010BF\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\023ClusterStatusProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java index 1dbce4d..d506e7e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java @@ -3697,6 +3697,16 @@ public final class HBaseProtos { * optional int32 replica_id = 7 [default = 0]; */ int getReplicaId(); + + // optional bool shadow = 8; + /** + * optional bool shadow = 8; + */ + boolean hasShadow(); + /** + * optional bool shadow = 8; + */ + boolean getShadow(); } /** * Protobuf type {@code RegionInfo} @@ -3797,6 +3807,11 @@ public final class HBaseProtos { replicaId_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + shadow_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3955,6 +3970,22 @@ public final class HBaseProtos { return replicaId_; } + // optional bool shadow = 8; + public static final int SHADOW_FIELD_NUMBER = 8; + private boolean shadow_; + /** + * optional bool shadow = 8; + */ + public boolean hasShadow() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool shadow = 8; + */ + public boolean getShadow() { + return shadow_; + } + private void initFields() { regionId_ = 0L; tableName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance(); @@ -3963,6 +3994,7 @@ public final class HBaseProtos { offline_ = false; split_ = false; replicaId_ = 0; + shadow_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4009,6 +4041,9 @@ public final class HBaseProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(7, replicaId_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, shadow_); + } getUnknownFields().writeTo(output); } @@ -4046,6 +4081,10 @@ public final class HBaseProtos { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, replicaId_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, shadow_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4104,6 +4143,11 @@ public final class HBaseProtos { result = result && (getReplicaId() == other.getReplicaId()); } + result = result && (hasShadow() == other.hasShadow()); + if (hasShadow()) { + result = result && (getShadow() + == other.getShadow()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4145,6 +4189,10 @@ public final class HBaseProtos { hash = (37 * hash) + REPLICA_ID_FIELD_NUMBER; hash = (53 * hash) + getReplicaId(); } + if (hasShadow()) { + hash = (37 * hash) + SHADOW_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getShadow()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4278,6 +4326,8 @@ public final class HBaseProtos { bitField0_ = (bitField0_ & ~0x00000020); replicaId_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + shadow_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -4338,6 +4388,10 @@ public final class HBaseProtos { to_bitField0_ |= 0x00000040; } result.replicaId_ = replicaId_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.shadow_ = shadow_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4375,6 +4429,9 @@ public final class HBaseProtos { if (other.hasReplicaId()) { setReplicaId(other.getReplicaId()); } + if (other.hasShadow()) { + setShadow(other.getShadow()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4735,6 +4792,39 @@ public final class HBaseProtos { return this; } + // optional bool shadow = 8; + private boolean shadow_ ; + /** + * optional bool shadow = 8; + */ + public boolean hasShadow() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool shadow = 8; + */ + public boolean getShadow() { + return shadow_; + } + /** + * optional bool shadow = 8; + */ + public Builder setShadow(boolean value) { + bitField0_ |= 0x00000080; + shadow_ = value; + onChanged(); + return this; + } + /** + * optional bool shadow = 8; + */ + public Builder clearShadow() { + bitField0_ = (bitField0_ & ~0x00000080); + shadow_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:RegionInfo) } @@ -16324,43 +16414,44 @@ public final class HBaseProtos { "ingPair\"o\n\022ColumnFamilySchema\022\014\n\004name\030\001 " + "\002(\014\022#\n\nattributes\030\002 \003(\0132\017.BytesBytesPair" + "\022&\n\rconfiguration\030\003 \003(\0132\017.NameStringPair" + - "\"\232\001\n\nRegionInfo\022\021\n\tregion_id\030\001 \002(\004\022\036\n\nta", + "\"\252\001\n\nRegionInfo\022\021\n\tregion_id\030\001 \002(\004\022\036\n\nta", "ble_name\030\002 \002(\0132\n.TableName\022\021\n\tstart_key\030" + "\003 \001(\014\022\017\n\007end_key\030\004 \001(\014\022\017\n\007offline\030\005 \001(\010\022" + - "\r\n\005split\030\006 \001(\010\022\025\n\nreplica_id\030\007 \001(\005:\0010\"1\n" + - "\014FavoredNodes\022!\n\014favored_node\030\001 \003(\0132\013.Se" + - "rverName\"\225\001\n\017RegionSpecifier\0222\n\004type\030\001 \002" + - "(\0162$.RegionSpecifier.RegionSpecifierType" + - "\022\r\n\005value\030\002 \002(\014\"?\n\023RegionSpecifierType\022\017" + - "\n\013REGION_NAME\020\001\022\027\n\023ENCODED_REGION_NAME\020\002" + - "\"%\n\tTimeRange\022\014\n\004from\030\001 \001(\004\022\n\n\002to\030\002 \001(\004\"" + - "A\n\nServerName\022\021\n\thost_name\030\001 \002(\t\022\014\n\004port", - "\030\002 \001(\r\022\022\n\nstart_code\030\003 \001(\004\"\033\n\013Coprocesso" + - "r\022\014\n\004name\030\001 \002(\t\"-\n\016NameStringPair\022\014\n\004nam" + - "e\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\",\n\rNameBytesPair\022" + - "\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesByt" + - "esPair\022\r\n\005first\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n" + - "\rNameInt64Pair\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 " + - "\001(\003\"\275\001\n\023SnapshotDescription\022\014\n\004name\030\001 \002(" + - "\t\022\r\n\005table\030\002 \001(\t\022\030\n\rcreation_time\030\003 \001(\003:" + - "\0010\022.\n\004type\030\004 \001(\0162\031.SnapshotDescription.T" + - "ype:\005FLUSH\022\017\n\007version\030\005 \001(\005\".\n\004Type\022\014\n\010D", - "ISABLED\020\000\022\t\n\005FLUSH\020\001\022\r\n\tSKIPFLUSH\020\002\"}\n\024P" + - "rocedureDescription\022\021\n\tsignature\030\001 \002(\t\022\020" + - "\n\010instance\030\002 \001(\t\022\030\n\rcreation_time\030\003 \001(\003:" + - "\0010\022&\n\rconfiguration\030\004 \003(\0132\017.NameStringPa" + - "ir\"\n\n\010EmptyMsg\"\033\n\007LongMsg\022\020\n\010long_msg\030\001 " + - "\002(\003\"\037\n\tDoubleMsg\022\022\n\ndouble_msg\030\001 \002(\001\"\'\n\r" + - "BigDecimalMsg\022\026\n\016bigdecimal_msg\030\001 \002(\014\"5\n" + - "\004UUID\022\026\n\016least_sig_bits\030\001 \002(\004\022\025\n\rmost_si" + - "g_bits\030\002 \002(\004\"K\n\023NamespaceDescriptor\022\014\n\004n" + - "ame\030\001 \002(\014\022&\n\rconfiguration\030\002 \003(\0132\017.NameS", - "tringPair\"$\n\020RegionServerInfo\022\020\n\010infoPor" + - "t\030\001 \001(\005*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n\rLESS" + - "_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n" + - "\020GREATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_O" + - "P\020\006B>\n*org.apache.hadoop.hbase.protobuf." + - "generatedB\013HBaseProtosH\001\240\001\001" + "\r\n\005split\030\006 \001(\010\022\025\n\nreplica_id\030\007 \001(\005:\0010\022\016\n" + + "\006shadow\030\010 \001(\010\"1\n\014FavoredNodes\022!\n\014favored" + + "_node\030\001 \003(\0132\013.ServerName\"\225\001\n\017RegionSpeci" + + "fier\0222\n\004type\030\001 \002(\0162$.RegionSpecifier.Reg" + + "ionSpecifierType\022\r\n\005value\030\002 \002(\014\"?\n\023Regio" + + "nSpecifierType\022\017\n\013REGION_NAME\020\001\022\027\n\023ENCOD" + + "ED_REGION_NAME\020\002\"%\n\tTimeRange\022\014\n\004from\030\001 " + + "\001(\004\022\n\n\002to\030\002 \001(\004\"A\n\nServerName\022\021\n\thost_na", + "me\030\001 \002(\t\022\014\n\004port\030\002 \001(\r\022\022\n\nstart_code\030\003 \001" + + "(\004\"\033\n\013Coprocessor\022\014\n\004name\030\001 \002(\t\"-\n\016NameS" + + "tringPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"," + + "\n\rNameBytesPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002" + + " \001(\014\"/\n\016BytesBytesPair\022\r\n\005first\030\001 \002(\014\022\016\n" + + "\006second\030\002 \002(\014\",\n\rNameInt64Pair\022\014\n\004name\030\001" + + " \001(\t\022\r\n\005value\030\002 \001(\003\"\275\001\n\023SnapshotDescript" + + "ion\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030\002 \001(\t\022\030\n\rcrea" + + "tion_time\030\003 \001(\003:\0010\022.\n\004type\030\004 \001(\0162\031.Snaps" + + "hotDescription.Type:\005FLUSH\022\017\n\007version\030\005 ", + "\001(\005\".\n\004Type\022\014\n\010DISABLED\020\000\022\t\n\005FLUSH\020\001\022\r\n\t" + + "SKIPFLUSH\020\002\"}\n\024ProcedureDescription\022\021\n\ts" + + "ignature\030\001 \002(\t\022\020\n\010instance\030\002 \001(\t\022\030\n\rcrea" + + "tion_time\030\003 \001(\003:\0010\022&\n\rconfiguration\030\004 \003(" + + "\0132\017.NameStringPair\"\n\n\010EmptyMsg\"\033\n\007LongMs" + + "g\022\020\n\010long_msg\030\001 \002(\003\"\037\n\tDoubleMsg\022\022\n\ndoub" + + "le_msg\030\001 \002(\001\"\'\n\rBigDecimalMsg\022\026\n\016bigdeci" + + "mal_msg\030\001 \002(\014\"5\n\004UUID\022\026\n\016least_sig_bits\030" + + "\001 \002(\004\022\025\n\rmost_sig_bits\030\002 \002(\004\"K\n\023Namespac" + + "eDescriptor\022\014\n\004name\030\001 \002(\014\022&\n\rconfigurati", + "on\030\002 \003(\0132\017.NameStringPair\"$\n\020RegionServe" + + "rInfo\022\020\n\010infoPort\030\001 \001(\005*r\n\013CompareType\022\010" + + "\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r" + + "\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUAL\020\004\022\013\n\007G" + + "REATER\020\005\022\t\n\005NO_OP\020\006B>\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\013HBaseProtosH\001" + + "\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -16390,7 +16481,7 @@ public final class HBaseProtos { internal_static_RegionInfo_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionInfo_descriptor, - new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", "ReplicaId", }); + new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", "ReplicaId", "Shadow", }); internal_static_FavoredNodes_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_FavoredNodes_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java index 19a8d74..53ed2f2 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java @@ -5212,6 +5212,22 @@ public final class RegionServerStatusProtos { * MERGE_REVERTED = 10; */ MERGE_REVERTED(10, 10), + /** + * READY_TO_SHADOW = 11; + */ + READY_TO_SHADOW(11, 11), + /** + * SHADOW_PONR = 12; + */ + SHADOW_PONR(12, 12), + /** + * SHADOWED = 13; + */ + SHADOWED(13, 13), + /** + * SHADOW_REVERTED = 14; + */ + SHADOW_REVERTED(14, 14), ; /** @@ -5266,6 +5282,22 @@ public final class RegionServerStatusProtos { * MERGE_REVERTED = 10; */ public static final int MERGE_REVERTED_VALUE = 10; + /** + * READY_TO_SHADOW = 11; + */ + public static final int READY_TO_SHADOW_VALUE = 11; + /** + * SHADOW_PONR = 12; + */ + public static final int SHADOW_PONR_VALUE = 12; + /** + * SHADOWED = 13; + */ + public static final int SHADOWED_VALUE = 13; + /** + * SHADOW_REVERTED = 14; + */ + public static final int SHADOW_REVERTED_VALUE = 14; public final int getNumber() { return value; } @@ -5283,6 +5315,10 @@ public final class RegionServerStatusProtos { case 8: return MERGED; case 9: return SPLIT_REVERTED; case 10: return MERGE_REVERTED; + case 11: return READY_TO_SHADOW; + case 12: return SHADOW_PONR; + case 13: return SHADOWED; + case 14: return SHADOW_REVERTED; default: return null; } } @@ -8412,34 +8448,36 @@ public final class RegionServerStatusProtos { "alErrorResponse\"6\n\037GetLastFlushedSequenc" + "eIdRequest\022\023\n\013region_name\030\001 \002(\014\"D\n GetLa" + "stFlushedSequenceIdResponse\022 \n\030last_flus" + - "hed_sequence_id\030\001 \002(\004\"\322\002\n\025RegionStateTra" + + "hed_sequence_id\030\001 \002(\004\"\233\003\n\025RegionStateTra" + "nsition\022>\n\017transition_code\030\001 \002(\0162%.Regio" + "nStateTransition.TransitionCode\022 \n\013regio" + "n_info\030\002 \003(\0132\013.RegionInfo\022\024\n\014open_seq_nu" + - "m\030\003 \001(\004\"\300\001\n\016TransitionCode\022\n\n\006OPENED\020\000\022\017" + + "m\030\003 \001(\004\"\211\002\n\016TransitionCode\022\n\n\006OPENED\020\000\022\017" + "\n\013FAILED_OPEN\020\001\022\n\n\006CLOSED\020\002\022\022\n\016READY_TO_", "SPLIT\020\003\022\022\n\016READY_TO_MERGE\020\004\022\016\n\nSPLIT_PON" + "R\020\005\022\016\n\nMERGE_PONR\020\006\022\t\n\005SPLIT\020\007\022\n\n\006MERGED" + "\020\010\022\022\n\016SPLIT_REVERTED\020\t\022\022\n\016MERGE_REVERTED" + - "\020\n\"m\n\"ReportRegionStateTransitionRequest" + - "\022\033\n\006server\030\001 \002(\0132\013.ServerName\022*\n\ntransit" + - "ion\030\002 \003(\0132\026.RegionStateTransition\"<\n#Rep" + - "ortRegionStateTransitionResponse\022\025\n\rerro" + - "r_message\030\001 \001(\t2\326\003\n\031RegionServerStatusSe" + - "rvice\022P\n\023RegionServerStartup\022\033.RegionSer" + - "verStartupRequest\032\034.RegionServerStartupR", - "esponse\022M\n\022RegionServerReport\022\032.RegionSe" + - "rverReportRequest\032\033.RegionServerReportRe" + - "sponse\022M\n\022ReportRSFatalError\022\032.ReportRSF" + - "atalErrorRequest\032\033.ReportRSFatalErrorRes" + - "ponse\022_\n\030GetLastFlushedSequenceId\022 .GetL" + - "astFlushedSequenceIdRequest\032!.GetLastFlu" + - "shedSequenceIdResponse\022h\n\033ReportRegionSt" + - "ateTransition\022#.ReportRegionStateTransit" + - "ionRequest\032$.ReportRegionStateTransition" + - "ResponseBN\n*org.apache.hadoop.hbase.prot", - "obuf.generatedB\030RegionServerStatusProtos" + - "H\001\210\001\001\240\001\001" + "\020\n\022\023\n\017READY_TO_SHADOW\020\013\022\017\n\013SHADOW_PONR\020\014" + + "\022\014\n\010SHADOWED\020\r\022\023\n\017SHADOW_REVERTED\020\016\"m\n\"R" + + "eportRegionStateTransitionRequest\022\033\n\006ser" + + "ver\030\001 \002(\0132\013.ServerName\022*\n\ntransition\030\002 \003" + + "(\0132\026.RegionStateTransition\"<\n#ReportRegi" + + "onStateTransitionResponse\022\025\n\rerror_messa" + + "ge\030\001 \001(\t2\326\003\n\031RegionServerStatusService\022P", + "\n\023RegionServerStartup\022\033.RegionServerStar" + + "tupRequest\032\034.RegionServerStartupResponse" + + "\022M\n\022RegionServerReport\022\032.RegionServerRep" + + "ortRequest\032\033.RegionServerReportResponse\022" + + "M\n\022ReportRSFatalError\022\032.ReportRSFatalErr" + + "orRequest\032\033.ReportRSFatalErrorResponse\022_" + + "\n\030GetLastFlushedSequenceId\022 .GetLastFlus" + + "hedSequenceIdRequest\032!.GetLastFlushedSeq" + + "uenceIdResponse\022h\n\033ReportRegionStateTran" + + "sition\022#.ReportRegionStateTransitionRequ", + "est\032$.ReportRegionStateTransitionRespons" + + "eBN\n*org.apache.hadoop.hbase.protobuf.ge" + + "neratedB\030RegionServerStatusProtosH\001\210\001\001\240\001" + + "\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index dbf00dc..a2fdbbe 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -51,6 +51,8 @@ message RegionState { MERGING_NEW = 14; // new region to be created when RS merges two // daughter regions but hasn't be created yet, or // master doesn't know it's already created + SHADOWING = 16; // server started shadow of a region + SHADOWED = 17; // server completed shadow of a region } } diff --git a/hbase-protocol/src/main/protobuf/HBase.proto b/hbase-protocol/src/main/protobuf/HBase.proto index 24941ff..339fad7 100644 --- a/hbase-protocol/src/main/protobuf/HBase.proto +++ b/hbase-protocol/src/main/protobuf/HBase.proto @@ -65,6 +65,7 @@ message RegionInfo { optional bool offline = 5; optional bool split = 6; optional int32 replica_id = 7 [default = 0]; + optional bool shadow = 8; } /** diff --git a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto index df9a521..cfde82d 100644 --- a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto +++ b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto @@ -104,6 +104,11 @@ message RegionStateTransition { MERGED = 8; SPLIT_REVERTED = 9; MERGE_REVERTED = 10; + + READY_TO_SHADOW = 11; + SHADOW_PONR = 12; + SHADOWED = 13; + SHADOW_REVERTED = 14; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 5ecbe98..411527b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -2202,6 +2202,53 @@ public class AssignmentManager { return null; } + private String onRegionShadow(ServerName sn, TransitionCode code, + final HRegionInfo region) { + final RegionState rs_r = regionStates.getRegionState(region); + if (!(rs_r.isOpenOrShadowingOnServer(sn))) { + return "Not in state good for split"; + } + + regionStates.updateRegionState(region, State.SHADOWING); + + if (code == TransitionCode.SHADOWED) { + if (TEST_SKIP_SPLIT_HANDLING) { + return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"; + } + regionOffline(region, State.SHADOWED); + + // User could disable the table before master knows the new region. + if (getTableStateManager().isTableState(region.getTable(), + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + invokeUnAssign(region); + } else { + Callable splitReplicasCallable = new Callable() { + @Override + public Object call() { + //doSplittingOfReplicas(p, a, b); + return null; + } + }; + threadPoolExecutorService.submit(splitReplicasCallable); + } + } else if (code == TransitionCode.SHADOW_PONR) { + try { + regionStates.shadowRegion(region, sn); + } catch (IOException ioe) { + LOG.info("Failed to record split region " + region.getShortNameToLog()); + return "Failed to record the splitting in meta"; + } + } else if (code == TransitionCode.SPLIT_REVERTED) { + regionOnline(region, sn); + + if (getTableStateManager().isTableState(region.getTable(), + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + invokeUnAssign(region); + } + } + return null; + } + private String onRegionMerge(ServerName sn, TransitionCode code, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { RegionState rs_p = regionStates.getRegionState(p); @@ -2381,16 +2428,8 @@ public class AssignmentManager { // Tell our listeners that a region was closed sendRegionClosedNotification(regionInfo); // also note that all the replicas of the primary should be closed - if (state != null && state.equals(State.SPLIT)) { - Collection c = new ArrayList(1); - c.add(regionInfo); - Map> map = regionStates.getRegionAssignments(c); - Collection> allReplicas = map.values(); - for (List list : allReplicas) { - replicasToClose.addAll(list); - } - } - else if (state != null && state.equals(State.MERGED)) { + if (state != null + && (state.equals(State.SPLIT) || state.equals(State.MERGED) || state.equals(State.SHADOWED))) { Collection c = new ArrayList(1); c.add(regionInfo); Map> map = regionStates.getRegionAssignments(c); @@ -2527,7 +2566,13 @@ public class AssignmentManager { HRegionInfo.convert(transition.getRegionInfo(1)), HRegionInfo.convert(transition.getRegionInfo(2))); break; - + + case READY_TO_SHADOW: + case SHADOW_PONR: + case SHADOWED: + case SHADOW_REVERTED: + errorMsg = onRegionShadow(serverName, code, hri); + break; default: errorMsg = "Unexpected transition code " + code; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index e5370c5..634fd65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -247,4 +247,8 @@ public class RegionStateStore { HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { MetaTableAccessor.mergeRegions(server.getShortCircuitConnection(), p, a, b, sn); } + + void shadowRegion(HRegionInfo region, ServerName sn) throws IOException { + MetaTableAccessor.shadowRegion(server.getShortCircuitConnection(), region, sn); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index a29d675..6722c1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -740,6 +740,18 @@ public class RegionStates { } } + void shadowRegion(HRegionInfo region, ServerName sn) throws IOException { + regionStateStore.shadowRegion(region, sn); + synchronized (this) { + // After PONR, split is considered to be done. + // Update server holdings to be aligned with the meta. + Set regions = serverHoldings.get(sn); + if (regions == null) { + throw new IllegalStateException(sn + " should host some regions"); + } + regions.remove(region); + } + } void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { regionStateStore.mergeRegions(p, a, b, sn); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 7898edc..64c0109 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -335,6 +335,7 @@ public class ServerShutdownHandler extends EventHandler { //to the dead server. We don't have to do anything. return false; } + if(hri.isShadow()) return false; boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(), ZooKeeperProtos.Table.State.DISABLING); if (disabling) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index eab29e7..26d8209 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1722,7 +1722,7 @@ public class HRegionServer extends HasThread implements updateRecoveringRegionLastFlushedSequenceId(r); // Notify master - if (!reportRegionStateTransition( + if (!r.getRegionInfo().isShadow() && !reportRegionStateTransition( TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) { throw new IOException("Failed to report opened region to master: " + r.getRegionNameAsString()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowRequest.java new file mode 100644 index 0000000..c709b07 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowRequest.java @@ -0,0 +1,132 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region shadow. Put in a queue, owned by HRegionServer. + */ +@InterfaceAudience.Private +class RegionShadowRequest implements Runnable { + static final Log LOG = LogFactory.getLog(RegionShadowRequest.class); + private final HRegion region; + private final HRegionServer server; + private final boolean openShadowRegion; + private TableLock tableLock; + + RegionShadowRequest(HRegion region, boolean openShadowRegion, HRegionServer hrs) { + Preconditions.checkNotNull(hrs); + this.region = region; + this.server = hrs; + this.openShadowRegion = openShadowRegion; + } + + @Override + public String toString() { + return "regionName=" + this.region; + } + + @Override + public void run() { + if (this.server.isStopping() || this.server.isStopped()) { + LOG.debug("Skipping shadow because server is stopping=" + + this.server.isStopping() + " or stopped=" + this.server.isStopped()); + return; + } + try { + final long startTime = System.currentTimeMillis(); + RegionShadowTransaction st = new RegionShadowTransaction(region, this.openShadowRegion); + + //acquire a shared read lock on the table, so that table schema modifications + //do not happen concurrently + tableLock = server.getTableLockManager().readLock(region.getTableDesc().getTableName() + , "SHADOW_REGION:" + region.getRegionNameAsString()); + try { + tableLock.acquire(); + } catch (IOException ex) { + tableLock = null; + throw ex; + } + + // If prepare does not return true, for some reason -- logged inside in + // the prepare call -- we are not ready to shadow just now. Just return. + if (!st.prepare()) return; + try { + st.execute(this.server, this.server); + } catch (Exception e) { + if (this.server.isStopping() || this.server.isStopped()) { + LOG.info( + "Skip rollback/cleanup of failed shadow of " + + region.getRegionNameAsString() + " because server is" + + (this.server.isStopping() ? " stopping" : " stopped"), e); + return; + } + try { + LOG.info("Running rollback/cleanup of failed split of " + + region.getRegionNameAsString() + "; " + e.getMessage(), e); + if (st.rollback(this.server, this.server)) { + LOG.info("Successful rollback of failed shadow of " + + region.getRegionNameAsString()); + } else { + this.server.abort("Abort; we got an error after point-of-no-return"); + } + } catch (RuntimeException ee) { + String msg = "Failed rollback of failed shadow of " + + region.getRegionNameAsString() + " -- aborting server"; + // If failed rollback, kill this server to avoid having a hole in table. + LOG.info(msg, ee); + this.server.abort(msg + " -- Cause: " + ee.getMessage()); + } + return; + } + LOG.info("Region shadow, hbase:meta updated, and report to master. Region=" + + region.getRegionNameAsString() + ". Shadow took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + } catch (IOException ex) { + ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; + LOG.error("Shadow failed " + this, ex); + server.checkFileSystem(); + } finally { + releaseTableLock(); + } + } + + protected void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.error("Could not release the table lock (something is really wrong). " + + "Aborting this server to avoid holding the lock forever."); + this.server.abort("Abort; we got an error when releasing the table lock " + + "on " + region.getRegionNameAsString()); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowTransaction.java new file mode 100644 index 0000000..a29675f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionShadowTransaction.java @@ -0,0 +1,376 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.zookeeper.KeeperException; + + +/** + * Executes region shadow as a "transaction". Call {@link #prepare()} to setup + * the transaction, {@link #execute(Server, RegionServerServices)} to run the + * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails. + * + *

Here is an example of how you would use this class: + *

+ *  RegionShadowTransaction st = new RegionShadowTransaction(this.conf, parent, midKey)
+ *  if (!st.prepare()) return;
+ *  try {
+ *    st.execute(server, services);
+ *  } catch (IOException ioe) {
+ *    try {
+ *      st.rollback(server, services);
+ *      return;
+ *    } catch (RuntimeException e) {
+ *      myAbortable.abort("Failed split, abort");
+ *    }
+ *  }
+ * 
+ *

This class is not thread safe. Caller needs ensure shadow is run by + * one thread only. + */ +@InterfaceAudience.Private +public class RegionShadowTransaction { + private static final Log LOG = LogFactory.getLog(RegionShadowTransaction.class); + + /* + * Region to split + */ + private final HRegion region; + + /* + * if true open shadowed region. + */ + private final boolean openShadowedRegion; + + /** + * Types to add to the transaction journal. + * Each enum is a step in the shadow transaction. Used to figure how much + * we need to rollback. + */ + enum JournalEntry { + /** + * Set region as in transition, set it into SHADOWING state. + */ + SET_SHADOWING, + /** + * Closed the region. + */ + CLOSED_REGION, + /** + * The the region has been taken out of the server's online regions list. + */ + OFFLINED_REGION, + /** + * Point of no return. + * If we got here, then transaction is not recoverable other than by + * crashing out the regionserver. + */ + PONR + } + + /* + * Journal of how far the split transaction has progressed. + */ + private final List journal = new ArrayList(); + + /** + * Constructor + * @param r Region to shadow + * @param openShadowedRegion true if the region need to be opened after shadow. + */ + public RegionShadowTransaction(final HRegion r, final boolean openShadowedRegion) { + this.region = r; + this.openShadowedRegion = openShadowedRegion; + } + + /** + * Does checks on shadow region inputs. + * @return true if the region can shadow else + * false if it is not (e.g. its already closed, etc.). + */ + public boolean prepare() { + if (this.region.isAvailable() || !this.region.hasReferences()) return true; + return false; + } + + private static IOException closedByOtherException = new IOException( + "Failed to close region: already closed by another thread"); + + /** + * Close region and make the region shadow. + * @param server Hosting server instance. Can be null when testing (won't try + * and update in zk if a null server) + * @param services Used to online/offline regions. + * @throws IOException If thrown, transaction failed. + * Call {@link #rollback(Server, RegionServerServices)} + */ + /* package */void makeRegionShadow(final Server server, + final RegionServerServices services) throws IOException { + LOG.info("Starting shadow of region " + this.region); + if ((server != null && server.isStopped()) || + (services != null && services.isStopping())) { + throw new IOException("Server is stopped or stopping"); + } + assert !this.region.lock.writeLock().isHeldByCurrentThread(): + "Unsafe to hold write lock while performing RPCs"; + + // If true, no cluster to write meta edits to or to update znodes in. + boolean testing = server == null? true: + server.getConfiguration().getBoolean("hbase.testing.nocluster", false); + + stepsBeforePONR(server, services, testing); + + // If we reach the PONR then subsequent failures need to crash out this regionserver; the + // server shutdown processing should be able to fix-up the incomplete shadow. + this.journal.add(JournalEntry.PONR); + + if (services != null && !services.reportRegionStateTransition(TransitionCode.SHADOW_PONR, + region.getRegionInfo())) { + // Passed PONR, let SSH clean it up + throw new IOException("Failed to notify master that shadow passed PONR: " + + region.getRegionInfo().getRegionNameAsString()); + } + } + + public void stepsBeforePONR(final Server server, + final RegionServerServices services, boolean testing) throws IOException { + if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SHADOW, + region.getRegionInfo())) { + throw new IOException("Failed to get ok from master to shadow " + + region.getRegionNameAsString()); + } + this.journal.add(JournalEntry.SET_SHADOWING); + + Map> hstoreFiles = null; + Exception exceptionToThrow = null; + try{ + hstoreFiles = this.region.close(false); + } catch (Exception e) { + exceptionToThrow = e; + } + if (exceptionToThrow == null && hstoreFiles == null) { + // The region was closed by a concurrent thread. We can't continue + // with the split, instead we must just abandon the split. If we + // reopen or split this could cause problems because the region has + // probably already been moved to a different server, or is in the + // process of moving to a different server. + exceptionToThrow = closedByOtherException; + } + if (exceptionToThrow != closedByOtherException) { + this.journal.add(JournalEntry.CLOSED_REGION); + } + if (exceptionToThrow != null) { + if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; + throw new IOException(exceptionToThrow); + } + if (!testing) { + services.removeFromOnlineRegions(this.region, null); + } + this.journal.add(JournalEntry.OFFLINED_REGION); + } + + /** + * Perform time consuming opening of the the shadowed region. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @throws IOException If thrown, transaction failed. + * Call {@link #rollback(Server, RegionServerServices)} + */ + /* package */void openShadowRegion(final Server server, + final RegionServerServices services) + throws IOException { + boolean stopped = server != null && server.isStopped(); + boolean stopping = services != null && services.isStopping(); + // TODO: Is this check needed here? + if (stopped || stopping) { + LOG.info("Not opening shadow region " + + this.region.getRegionInfo().getRegionNameAsString() + + " because stopping=" + stopping + ", stopped=" + stopped); + } else { + if(this.openShadowedRegion) { + try { + this.region.getRegionInfo().setShadow(true); + openShadowRegion(server, region); + } catch (KeeperException e) { + throw new IOException(e); + } + } + if (services != null) { + // Even if not opening the shadowed region still report region transition to master so that + // the region will be offline at master side. + if (!services.reportRegionStateTransition(TransitionCode.SHADOWED, + region.getRegionInfo())) { + throw new IOException("Failed to report shadow region to master: " + + region.getRegionInfo().getShortNameToLog()); + } + if (this.openShadowedRegion) { + // Should add it to OnlineRegions + services.addToOnlineRegions(this.region); + } + } + } + } + + /** + * Run the transaction. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @throws IOException If thrown, transaction failed. + * Call {@link #rollback(Server, RegionServerServices)} + * @return Regions created + * @throws IOException + * @see #rollback(Server, RegionServerServices) + */ + public void execute(final Server server, + final RegionServerServices services) + throws IOException { + makeRegionShadow(server, services); + stepsAfterPONR(server, services); + } + + public void stepsAfterPONR(final Server server, + final RegionServerServices services) + throws IOException { + openShadowRegion(server, services); + } + + public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { + p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, + Bytes.toBytes(sn.getHostAndPort())); + p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, + Bytes.toBytes(sn.getStartcode())); + p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, + Bytes.toBytes(openSeqNum)); + return p; + } + + /** + * Open daughter regions, add them to online list and update meta. + * @param server + * @param region + * @throws IOException + * @throws KeeperException + */ + void openShadowRegion(final Server server, final HRegion region) + throws IOException, KeeperException { + HRegionInfo hri = region.getRegionInfo(); + LoggingProgressable reporter = server == null ? null + : new LoggingProgressable(hri, server.getConfiguration().getLong( + "hbase.regionserver.split.daughter.open.log.interval", 10000)); + region.openHRegion(reporter); + } + + static class LoggingProgressable implements CancelableProgressable { + private final HRegionInfo hri; + private long lastLog = -1; + private final long interval; + + LoggingProgressable(final HRegionInfo hri, final long interval) { + this.hri = hri; + this.interval = interval; + } + + @Override + public boolean progress() { + long now = System.currentTimeMillis(); + if (now - lastLog > this.interval) { + LOG.info("Opening " + this.hri.getRegionNameAsString()); + this.lastLog = now; + } + return true; + } + } + + /** + * @param server Hosting server instance (May be null when testing). + * @param services + * @throws IOException If thrown, rollback failed. Take drastic action. + * @return True if we successfully rolled back, false if we got to the point + * of no return and so now need to abort the server to minimize damage. + */ + @SuppressWarnings("deprecation") + public boolean rollback(final Server server, final RegionServerServices services) + throws IOException { + boolean result = true; + ListIterator iterator = + this.journal.listIterator(this.journal.size()); + // Iterate in reverse. + while (iterator.hasPrevious()) { + JournalEntry je = iterator.previous(); + switch(je) { + + case SET_SHADOWING: + if (services != null + && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED, + region.getRegionInfo())) { + return false; + } + break; + + case CLOSED_REGION: + try { + // So, this returns a seqid but if we just closed and then reopened, we + // should be ok. On close, we flushed using sequenceid obtained from + // hosting regionserver so no need to propagate the sequenceid returned + // out of initialize below up into regionserver as we normally do. + // TODO: Verify. + this.region.initialize(); + } catch (IOException e) { + LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + + this.region.getRegionNameAsString(), e); + throw new RuntimeException(e); + } + break; + + case OFFLINED_REGION: + if (services != null) services.addToOnlineRegions(this.region); + break; + + case PONR: + // We got to the point-of-no-return so we need to just abort. Return + // immediately. Do not clean up created daughter regions. They need + // to be in place so we don't delete the parent region mistakenly. + // See HBASE-3872. + return false; + + default: + throw new RuntimeException("Unhandled journal entry: " + je); + } + } + return result; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index ce3a4a7..d285316 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -156,7 +156,7 @@ public class OpenRegionHandler extends EventHandler { cleanupFailedOpen(region); } } finally { - rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo); + if(!region.getRegionInfo().isShadow())rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionShadowTranscationOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionShadowTranscationOnCluster.java new file mode 100644 index 0000000..a3b221a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionShadowTranscationOnCluster.java @@ -0,0 +1,182 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.protobuf.ServiceException; + +public class TestRegionShadowTranscationOnCluster { + + private final static byte[] FAMILY = Bytes.toBytes("FAMILY"); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + final static Configuration conf = TEST_UTIL.getConfiguration(); + final static byte[] CHILD_TABLE_KEY = Bytes.toBytes("CHILD_TABLE"); + private static Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.startMiniCluster(3); + admin = TEST_UTIL.getHBaseAdmin(); + } + + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if(admin != null) admin.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 300000) + public void testShadowRegion() throws IOException, InterruptedException { + TableName tableName = TableName.valueOf("testShadowRegion"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc); + HTable table = new HTable(conf,tableName); + Put p = new Put(Bytes.toBytes("row")); + p.add(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("value")); + table.put(p); + HRegionServer rsHoldingRegion = TEST_UTIL.getRSForFirstRegionInTable(tableName); + List regions = TEST_UTIL.getHBaseCluster().getRegions(tableName); + RegionShadowRequest rs = new RegionShadowRequest(regions.get(0), true, rsHoldingRegion); + Thread t = new Thread(rs); + t.start(); + t.join(); + HRegion onlineRegion = rsHoldingRegion.getOnlineRegion(regions.get(0).getRegionName()); + assertNotNull(onlineRegion); + p = new Put(Bytes.toBytes("row2")); + p.add(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("value")); + table.put(p); + ResultScanner scanner = table.getScanner(new Scan()); + int i = 0; + for(Result r:scanner){ + i++; + } + assertEquals(2,i); + } + + @Test(timeout = 300000) + public void testPutsAndScansOnShadowRegionsOpenedThroughCoprocessors() throws IOException, InterruptedException { + TableName tableName = + TableName.valueOf("normalTable"); + TableName shadowTable = + TableName.valueOf("shadowTable"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc); + HTableDescriptor shadowTabledesc = new HTableDescriptor(shadowTable); + shadowTabledesc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(shadowTabledesc); + /// admin.shadowTable(shadowTable); + desc.setValue(CHILD_TABLE_KEY, shadowTable.getName()); + desc.addCoprocessor(MockedRegionObserver.class.getName()); + admin.modifyTable(desc.getTableName(), desc); + HRegionServer rsHoldingRegion = TEST_UTIL.getRSForFirstRegionInTable(shadowTable); + List regions = TEST_UTIL.getHBaseCluster().getRegions(shadowTable); + RegionShadowRequest rs = new RegionShadowRequest(regions.get(0), false, rsHoldingRegion); + Thread t = new Thread(rs); + t.start(); + t.join(); + HRegion onlineRegion = rsHoldingRegion.getOnlineRegion(regions.get(0).getRegionName()); + assertNull(onlineRegion); + HTable table = new HTable(conf,tableName); + Put p = new Put(Bytes.toBytes("row")); + p.add(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("value")); + table.put(p); + HTable stable = new HTable(conf,shadowTable); + p = new Put(Bytes.toBytes("row")); + p.add(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("value")); + stable.put(p); + ResultScanner scanner = stable.getScanner(new Scan()); + int i = 0; + for(Result r:scanner) { + i++; + } + assertEquals(i,1); + } + + public static class MockedRegionObserver extends BaseRegionObserver { + @Override + public void preOpen(ObserverContext e) throws IOException { + byte[] childKey = e.getEnvironment().getRegion().getTableDesc().getValue(CHILD_TABLE_KEY); + if (childKey != null) { + RegionServerServices rsServices = e.getEnvironment().getRegionServerServices(); + TableName childName = TableName.valueOf(childKey); + HRegionServer rs = ((HRegionServer)rsServices); + AdminService.BlockingInterface admin = rs.getRSRpcServices(); + List tableRegions = + MetaTableAccessor.getTableRegions(rsServices.getZooKeeper(), + rs.getShortCircuitConnection(), childName); + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( + rsServices.getServerName(), tableRegions.get(0), null, Boolean.TRUE); + try { + OpenRegionResponse response = admin.openRegion(null, orr); + while (rs.getFromOnlineRegions(tableRegions.get(0).getEncodedName()) == null) { + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + } + } + MetaTableAccessor.updateRegionLocation(rs.getShortCircuitConnection(), tableRegions.get(0), rsServices.getServerName(), 1); + } catch (ServiceException e1) { + throw new IOException(e1); + } + + } + } + + @Override + public void preClose(ObserverContext e, boolean abortRequested) + throws IOException { + byte[] childKey = e.getEnvironment().getRegion().getTableDesc().getValue(CHILD_TABLE_KEY); + if (childKey != null) { + RegionServerServices rsServices = e.getEnvironment().getRegionServerServices(); + TableName childName = TableName.valueOf(childKey); + List onlineRegions = rsServices.getOnlineRegions(childName); + if(onlineRegions.isEmpty()) return; + new CloseRegionHandler(rsServices, rsServices, onlineRegions.get(0).getRegionInfo(), + abortRequested, rsServices.getServerName()).process(); + } + } + } + +} -- 1.9.4.msysgit.0