diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index cf34de5..bbb831d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -945,10 +945,18 @@ public class MetaTableAccessor { * table */ public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) { + return makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); + } + + /** + * Generates and returns a Delete containing the region info for the catalog + * table + */ + public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) { if (regionInfo == null) { throw new IllegalArgumentException("Can't make a delete for null region"); } - Delete delete = new Delete(regionInfo.getRegionName()); + Delete delete = new Delete(regionInfo.getRegionName(), ts); return delete; } @@ -1217,25 +1225,30 @@ public class MetaTableAccessor { * @param regionA * @param regionB * @param sn the location of the region + * @param masterSystemTime * @throws IOException */ public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, - HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication) + HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication, + long masterSystemTime) throws IOException { Table meta = getMetaHTable(connection); try { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + // use the maximum of what master passed us vs local time. + long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime); + // Put for parent - Put putOfMerged = makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); // Deletes for merging regions - Delete deleteA = makeDeleteFromRegionInfo(regionA); - Delete deleteB = makeDeleteFromRegionInfo(regionB); + Delete deleteA = makeDeleteFromRegionInfo(regionA, time); + Delete deleteB = makeDeleteFromRegionInfo(regionB, time); // The merged is a new region, openSeqNum = 1 is fine. addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId()); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index a77c56a..6f726fe 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -943,6 +943,8 @@ public final class RequestConverter { builder.setRegionA(regionASpecifier); builder.setRegionB(regionBSpecifier); builder.setForcible(forcible); + // send the master's wall clock time as well, so that the RS can refer to it + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); return builder.build(); } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index b6c511c..256f865 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -14642,6 +14642,24 @@ public final class AdminProtos { * optional bool forcible = 3 [default = false]; */ boolean getForcible(); + + // optional uint64 master_system_time = 4; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + boolean hasMasterSystemTime(); + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + long getMasterSystemTime(); } /** * Protobuf type {@code MergeRegionsRequest} @@ -14732,6 +14750,11 @@ public final class AdminProtos { forcible_ = input.readBool(); break; } + case 32: { + bitField0_ |= 0x00000008; + masterSystemTime_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14832,10 +14855,35 @@ public final class AdminProtos { return forcible_; } + // optional uint64 master_system_time = 4; + public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4; + private long masterSystemTime_; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + private void initFields() { regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); forcible_ = false; + masterSystemTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14874,6 +14922,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(4, masterSystemTime_); + } getUnknownFields().writeTo(output); } @@ -14895,6 +14946,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, masterSystemTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14933,6 +14988,11 @@ public final class AdminProtos { result = result && (getForcible() == other.getForcible()); } + result = result && (hasMasterSystemTime() == other.hasMasterSystemTime()); + if (hasMasterSystemTime()) { + result = result && (getMasterSystemTime() + == other.getMasterSystemTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14958,6 +15018,10 @@ public final class AdminProtos { hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getForcible()); } + if (hasMasterSystemTime()) { + hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMasterSystemTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -15090,6 +15154,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); forcible_ = false; bitField0_ = (bitField0_ & ~0x00000004); + masterSystemTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -15138,6 +15204,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000004; } result.forcible_ = forcible_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.masterSystemTime_ = masterSystemTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15163,6 +15233,9 @@ public final class AdminProtos { if (other.hasForcible()) { setForcible(other.getForcible()); } + if (other.hasMasterSystemTime()) { + setMasterSystemTime(other.getMasterSystemTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -15473,6 +15546,55 @@ public final class AdminProtos { return this; } + // optional uint64 master_system_time = 4; + private long masterSystemTime_ ; + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder setMasterSystemTime(long value) { + bitField0_ |= 0x00000008; + masterSystemTime_ = value; + onChanged(); + return this; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder clearMasterSystemTime() { + bitField0_ = (bitField0_ & ~0x00000008); + masterSystemTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:MergeRegionsRequest) } @@ -23406,53 +23528,53 @@ public final class AdminProtos { "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " + - "\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001", - " \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" + - "2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" + - "lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" + - "\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" + - "\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" + - "\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" + - "2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" + - "\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" + - "rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" + - "opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS", - "erverResponse\"\026\n\024GetServerInfoRequest\"B\n" + - "\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" + - "rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" + - "foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" + - "Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" + - "dateConfigurationResponse2\325\010\n\014AdminServi" + - "ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" + - "st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" + - "le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" + - "esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe", - "gionRequest\032\030.GetOnlineRegionResponse\0225\n" + - "\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" + - "gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" + - "ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" + - "seRegion\022\023.CloseRegionRequest\032\024.CloseReg" + - "ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" + - "Request\032\024.FlushRegionResponse\0228\n\013SplitRe" + - "gion\022\023.SplitRegionRequest\032\024.SplitRegionR" + - "esponse\022>\n\rCompactRegion\022\025.CompactRegion" + - "Request\032\026.CompactRegionResponse\022;\n\014Merge", - "Regions\022\024.MergeRegionsRequest\032\025.MergeReg" + - "ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" + - "icateWALEntryRequest\032\032.ReplicateWALEntry" + - "Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" + - "quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons" + - "e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponse\022M\n\022UpdateFavoredNodes\022\032", - ".UpdateFavoredNodesRequest\032\033.UpdateFavor" + - "edNodesResponse\022P\n\023UpdateConfiguration\022\033" + - ".UpdateConfigurationRequest\032\034.UpdateConf" + - "igurationResponseBA\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\013AdminProtosH\001\210\001" + - "\001\240\001\001" + "\001(\r\"\222\001\n\023MergeRegionsRequest\022\"\n\010region_a\030", + "\001 \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(" + + "\0132\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005f" + + "alse\022\032\n\022master_system_time\030\004 \001(\004\"\026\n\024Merg" + + "eRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(" + + "\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025a" + + "ssociated_cell_count\030\003 \001(\005\"4\n\030ReplicateW" + + "ALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry" + + "\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWAL" + + "WriterRequest\"0\n\025RollWALWriterResponse\022\027" + + "\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerReq", + "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + + "se\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo" + + "\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nwe" + + "bui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022" + + " \n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upd" + + "ateConfigurationRequest\"\035\n\033UpdateConfigu" + + "rationResponse2\325\010\n\014AdminService\022>\n\rGetRe" + + "gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" + + "ionInfoResponse\022;\n\014GetStoreFile\022\024.GetSto" + + "reFileRequest\032\025.GetStoreFileResponse\022D\n\017", + "GetOnlineRegion\022\027.GetOnlineRegionRequest" + + "\032\030.GetOnlineRegionResponse\0225\n\nOpenRegion" + + "\022\022.OpenRegionRequest\032\023.OpenRegionRespons" + + "e\022;\n\014WarmupRegion\022\024.WarmupRegionRequest\032" + + "\025.WarmupRegionResponse\0228\n\013CloseRegion\022\023." + + "CloseRegionRequest\032\024.CloseRegionResponse" + + "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" + + "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" + + "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" + + "CompactRegion\022\025.CompactRegionRequest\032\026.C", + "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" + + "ergeRegionsRequest\032\025.MergeRegionsRespons" + + "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" + + "ryRequest\032\032.ReplicateWALEntryResponse\022?\n" + + "\006Replay\022\031.ReplicateWALEntryRequest\032\032.Rep" + + "licateWALEntryResponse\022>\n\rRollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rGetServerInfo\022\025.GetServerInfoR" + + "equest\032\026.GetServerInfoResponse\0225\n\nStopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes", + "ponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavo" + + "redNodesRequest\032\033.UpdateFavoredNodesResp" + + "onse\022P\n\023UpdateConfiguration\022\033.UpdateConf" + + "igurationRequest\032\034.UpdateConfigurationRe" + + "sponseBA\n*org.apache.hadoop.hbase.protob" + + "uf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23596,7 +23718,7 @@ public final class AdminProtos { internal_static_MergeRegionsRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MergeRegionsRequest_descriptor, - new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); + new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", }); internal_static_MergeRegionsResponse_descriptor = getDescriptor().getMessageTypes().get(21); internal_static_MergeRegionsResponse_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/Admin.proto hbase-protocol/src/main/protobuf/Admin.proto index 550f0b8..b9bdf6e 100644 --- hbase-protocol/src/main/protobuf/Admin.proto +++ hbase-protocol/src/main/protobuf/Admin.proto @@ -185,6 +185,8 @@ message MergeRegionsRequest { required RegionSpecifier region_a = 1; required RegionSpecifier region_b = 2; optional bool forcible = 3 [default = false]; + // wall clock time from master + optional uint64 master_system_time = 4; } message MergeRegionsResponse { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index fbfb440..91b4659 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.zookeeper.KeeperException; @@ -256,6 +257,7 @@ public class RegionStateStore { void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { - MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication); + MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication, + EnvironmentEdgeManager.currentTime()); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 4e30897..c659557 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -219,9 +219,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestRegionsMerge(final Region a, - final Region b, final boolean forcible) { + final Region b, final boolean forcible, long masterSystemTime) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 615efb2..46b167a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1308,6 +1308,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Region regionA = getRegion(request.getRegionA()); Region regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || @@ -1328,7 +1329,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } - regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); + regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index 62990b0..d449eb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable { private final HRegionServer server; private final boolean forcible; private TableLock tableLock; + private final long masterSystemTime; - RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible) { + RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible, + long masterSystemTime) { Preconditions.checkNotNull(hrs); this.region_a = (HRegion)a; this.region_b = (HRegion)b; this.server = hrs; this.forcible = forcible; + this.masterSystemTime = masterSystemTime; } @Override @@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable { try { final long startTime = EnvironmentEdgeManager.currentTime(); RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, - region_b, forcible); + region_b, forcible, masterSystemTime); //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java index f70027c..b22e12c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java @@ -62,6 +62,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { // We only merge adjacent regions if forcible is false private final boolean forcible; private boolean useCoordinationForAssignment; + private final long masterSystemTime; /* * Transaction state for listener, only valid during execute and @@ -129,6 +130,17 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { */ public RegionMergeTransactionImpl(final Region a, final Region b, final boolean forcible) { + this(a, b, forcible, EnvironmentEdgeManager.currentTime()); + } + /** + * Constructor + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + * @param masterSystemTime the time at the master side + */ + public RegionMergeTransactionImpl(final Region a, final Region b, + final boolean forcible, long masterSystemTime) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { this.region_a = (HRegion)a; this.region_b = (HRegion)b; @@ -137,6 +149,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { this.region_b = (HRegion)a; } this.forcible = forcible; + this.masterSystemTime = masterSystemTime; this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); } @@ -335,7 +348,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { if (metaEntries.isEmpty()) { MetaTableAccessor.mergeRegions(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), - server.getServerName(), region_a.getTableDesc().getRegionReplication()); + server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime); } else { mergeRegionsAndPutMetaEntries(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), @@ -368,15 +381,15 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); // Put for parent - Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, masterSystemTime); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); mutations.add(putOfMerged); // Deletes for merging regions - Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); - Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); + Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, masterSystemTime); + Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, masterSystemTime); mutations.add(deleteA); mutations.add(deleteB); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 2043bbb..6569721 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -464,7 +464,7 @@ public class TestMetaTableAccessor { List regionInfos = Lists.newArrayList(parentA, parentB); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); + MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3, HConstants.LATEST_TIMESTAMP); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -473,6 +473,9 @@ public class TestMetaTableAccessor { } } + /** + * Tests whether maximum of masters system time versus RSs local system time is used + */ @Test public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException { long regionId = System.currentTimeMillis(); @@ -509,5 +512,57 @@ public class TestMetaTableAccessor { meta.close(); } } + + @Test + public void testMastersSystemTimeIsUsedInMergeRegions() throws IOException { + long regionId = System.currentTimeMillis(); + HRegionInfo regionInfoA = new HRegionInfo(TableName.valueOf("table_foo"), + HConstants.EMPTY_START_ROW, new byte[] {'a'}, false, regionId, 0); + HRegionInfo regionInfoB = new HRegionInfo(TableName.valueOf("table_foo"), + new byte[] {'a'}, HConstants.EMPTY_END_ROW, false, regionId, 0); + HRegionInfo mergedRegionInfo = new HRegionInfo(TableName.valueOf("table_foo"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0); + + ServerName sn = ServerName.valueOf("bar", 0, 0); + Table meta = MetaTableAccessor.getMetaHTable(connection); + try { + List regionInfos = Lists.newArrayList(regionInfoA, regionInfoB); + MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); + + // write the serverName column with a big current time, but set the masters time as even + // bigger. When region merge deletes the rows for regionA and regionB, the serverName columns + // should not be seen by the following get + long serverNameTime = EnvironmentEdgeManager.currentTime() + 100000000; + long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789; + + // write the serverName columns + MetaTableAccessor.updateRegionLocation(connection, regionInfoA, sn, 1, serverNameTime); + + // assert that we have the serverName column with expected ts + Get get = new Get(mergedRegionInfo.getRegionName()); + Result result = meta.get(get); + Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getServerColumn(0)); + assertNotNull(serverCell); + assertEquals(serverNameTime, serverCell.getTimestamp()); + + // now merge the regions, effectively deleting the rows for region a and b. + MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, + regionInfoA, regionInfoB, sn, 1, masterSystemTime); + + result = meta.get(get); + serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getServerColumn(0)); + Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getStartCodeColumn(0)); + Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getSeqNumColumn(0)); + assertNull(serverCell); + assertNull(startCodeCell); + assertNull(seqNumCell); + } finally { + meta.close(); + } + } }