diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index cf34de5..bbb831d 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -945,10 +945,18 @@ public class MetaTableAccessor {
* table
*/
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) {
+ return makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Generates and returns a Delete containing the region info for the catalog
+ * table
+ */
+ public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) {
if (regionInfo == null) {
throw new IllegalArgumentException("Can't make a delete for null region");
}
- Delete delete = new Delete(regionInfo.getRegionName());
+ Delete delete = new Delete(regionInfo.getRegionName(), ts);
return delete;
}
@@ -1217,25 +1225,30 @@ public class MetaTableAccessor {
* @param regionA
* @param regionB
* @param sn the location of the region
+ * @param masterSystemTime
* @throws IOException
*/
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
- HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication)
+ HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
+ long masterSystemTime)
throws IOException {
Table meta = getMetaHTable(connection);
try {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
+ // use the maximum of what master passed us vs local time.
+ long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
+
// Put for parent
- Put putOfMerged = makePutFromRegionInfo(copyOfMerged);
+ Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray());
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray());
// Deletes for merging regions
- Delete deleteA = makeDeleteFromRegionInfo(regionA);
- Delete deleteB = makeDeleteFromRegionInfo(regionB);
+ Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
+ Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
// The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index a77c56a..6f726fe 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -943,6 +943,8 @@ public final class RequestConverter {
builder.setRegionA(regionASpecifier);
builder.setRegionB(regionBSpecifier);
builder.setForcible(forcible);
+ // send the master's wall clock time as well, so that the RS can refer to it
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build();
}
diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index b6c511c..256f865 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -14642,6 +14642,24 @@ public final class AdminProtos {
* optional bool forcible = 3 [default = false];
*/
boolean getForcible();
+
+ // optional uint64 master_system_time = 4;
+ /**
+ * optional uint64 master_system_time = 4;
+ *
+ *
+ * wall clock time from master + *+ */ + boolean hasMasterSystemTime(); + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + long getMasterSystemTime(); } /** * Protobuf type {@code MergeRegionsRequest} @@ -14732,6 +14750,11 @@ public final class AdminProtos { forcible_ = input.readBool(); break; } + case 32: { + bitField0_ |= 0x00000008; + masterSystemTime_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14832,10 +14855,35 @@ public final class AdminProtos { return forcible_; } + // optional uint64 master_system_time = 4; + public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4; + private long masterSystemTime_; + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + private void initFields() { regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); forcible_ = false; + masterSystemTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14874,6 +14922,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(4, masterSystemTime_); + } getUnknownFields().writeTo(output); } @@ -14895,6 +14946,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, masterSystemTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14933,6 +14988,11 @@ public final class AdminProtos { result = result && (getForcible() == other.getForcible()); } + result = result && (hasMasterSystemTime() == other.hasMasterSystemTime()); + if (hasMasterSystemTime()) { + result = result && (getMasterSystemTime() + == other.getMasterSystemTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14958,6 +15018,10 @@ public final class AdminProtos { hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getForcible()); } + if (hasMasterSystemTime()) { + hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMasterSystemTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -15090,6 +15154,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); forcible_ = false; bitField0_ = (bitField0_ & ~0x00000004); + masterSystemTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -15138,6 +15204,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000004; } result.forcible_ = forcible_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.masterSystemTime_ = masterSystemTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15163,6 +15233,9 @@ public final class AdminProtos { if (other.hasForcible()) { setForcible(other.getForcible()); } + if (other.hasMasterSystemTime()) { + setMasterSystemTime(other.getMasterSystemTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -15473,6 +15546,55 @@ public final class AdminProtos { return this; } + // optional uint64 master_system_time = 4; + private long masterSystemTime_ ; + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public Builder setMasterSystemTime(long value) { + bitField0_ |= 0x00000008; + masterSystemTime_ = value; + onChanged(); + return this; + } + /** + *
optional uint64 master_system_time = 4;
+ *
+ * + * wall clock time from master + *+ */ + public Builder clearMasterSystemTime() { + bitField0_ = (bitField0_ & ~0x00000008); + masterSystemTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:MergeRegionsRequest) } @@ -23406,53 +23528,53 @@ public final class AdminProtos { "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " + - "\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001", - " \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" + - "2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" + - "lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" + - "\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" + - "\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" + - "\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" + - "2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" + - "\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" + - "rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" + - "opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS", - "erverResponse\"\026\n\024GetServerInfoRequest\"B\n" + - "\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" + - "rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" + - "foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" + - "Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" + - "dateConfigurationResponse2\325\010\n\014AdminServi" + - "ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" + - "st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" + - "le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" + - "esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe", - "gionRequest\032\030.GetOnlineRegionResponse\0225\n" + - "\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" + - "gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" + - "ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" + - "seRegion\022\023.CloseRegionRequest\032\024.CloseReg" + - "ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" + - "Request\032\024.FlushRegionResponse\0228\n\013SplitRe" + - "gion\022\023.SplitRegionRequest\032\024.SplitRegionR" + - "esponse\022>\n\rCompactRegion\022\025.CompactRegion" + - "Request\032\026.CompactRegionResponse\022;\n\014Merge", - "Regions\022\024.MergeRegionsRequest\032\025.MergeReg" + - "ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" + - "icateWALEntryRequest\032\032.ReplicateWALEntry" + - "Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" + - "quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons" + - "e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponse\022M\n\022UpdateFavoredNodes\022\032", - ".UpdateFavoredNodesRequest\032\033.UpdateFavor" + - "edNodesResponse\022P\n\023UpdateConfiguration\022\033" + - ".UpdateConfigurationRequest\032\034.UpdateConf" + - "igurationResponseBA\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\013AdminProtosH\001\210\001" + - "\001\240\001\001" + "\001(\r\"\222\001\n\023MergeRegionsRequest\022\"\n\010region_a\030", + "\001 \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(" + + "\0132\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005f" + + "alse\022\032\n\022master_system_time\030\004 \001(\004\"\026\n\024Merg" + + "eRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(" + + "\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025a" + + "ssociated_cell_count\030\003 \001(\005\"4\n\030ReplicateW" + + "ALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry" + + "\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWAL" + + "WriterRequest\"0\n\025RollWALWriterResponse\022\027" + + "\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerReq", + "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + + "se\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo" + + "\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nwe" + + "bui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022" + + " \n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upd" + + "ateConfigurationRequest\"\035\n\033UpdateConfigu" + + "rationResponse2\325\010\n\014AdminService\022>\n\rGetRe" + + "gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" + + "ionInfoResponse\022;\n\014GetStoreFile\022\024.GetSto" + + "reFileRequest\032\025.GetStoreFileResponse\022D\n\017", + "GetOnlineRegion\022\027.GetOnlineRegionRequest" + + "\032\030.GetOnlineRegionResponse\0225\n\nOpenRegion" + + "\022\022.OpenRegionRequest\032\023.OpenRegionRespons" + + "e\022;\n\014WarmupRegion\022\024.WarmupRegionRequest\032" + + "\025.WarmupRegionResponse\0228\n\013CloseRegion\022\023." + + "CloseRegionRequest\032\024.CloseRegionResponse" + + "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" + + "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" + + "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" + + "CompactRegion\022\025.CompactRegionRequest\032\026.C", + "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" + + "ergeRegionsRequest\032\025.MergeRegionsRespons" + + "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" + + "ryRequest\032\032.ReplicateWALEntryResponse\022?\n" + + "\006Replay\022\031.ReplicateWALEntryRequest\032\032.Rep" + + "licateWALEntryResponse\022>\n\rRollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rGetServerInfo\022\025.GetServerInfoR" + + "equest\032\026.GetServerInfoResponse\0225\n\nStopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes", + "ponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavo" + + "redNodesRequest\032\033.UpdateFavoredNodesResp" + + "onse\022P\n\023UpdateConfiguration\022\033.UpdateConf" + + "igurationRequest\032\034.UpdateConfigurationRe" + + "sponseBA\n*org.apache.hadoop.hbase.protob" + + "uf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23596,7 +23718,7 @@ public final class AdminProtos { internal_static_MergeRegionsRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MergeRegionsRequest_descriptor, - new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); + new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", }); internal_static_MergeRegionsResponse_descriptor = getDescriptor().getMessageTypes().get(21); internal_static_MergeRegionsResponse_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/Admin.proto hbase-protocol/src/main/protobuf/Admin.proto index 550f0b8..b9bdf6e 100644 --- hbase-protocol/src/main/protobuf/Admin.proto +++ hbase-protocol/src/main/protobuf/Admin.proto @@ -185,6 +185,8 @@ message MergeRegionsRequest { required RegionSpecifier region_a = 1; required RegionSpecifier region_b = 2; optional bool forcible = 3 [default = false]; + // wall clock time from master + optional uint64 master_system_time = 4; } message MergeRegionsResponse { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index fbfb440..91b4659 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.zookeeper.KeeperException; @@ -256,6 +257,7 @@ public class RegionStateStore { void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { - MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication); + MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication, + EnvironmentEdgeManager.currentTime()); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 4e30897..c659557 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -219,9 +219,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestRegionsMerge(final Region a, - final Region b, final boolean forcible) { + final Region b, final boolean forcible, long masterSystemTime) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 615efb2..46b167a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1308,6 +1308,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Region regionA = getRegion(request.getRegionA()); Region regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || @@ -1328,7 +1329,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } - regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); + regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index 62990b0..d449eb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable { private final HRegionServer server; private final boolean forcible; private TableLock tableLock; + private final long masterSystemTime; - RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible) { + RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible, + long masterSystemTime) { Preconditions.checkNotNull(hrs); this.region_a = (HRegion)a; this.region_b = (HRegion)b; this.server = hrs; this.forcible = forcible; + this.masterSystemTime = masterSystemTime; } @Override @@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable { try { final long startTime = EnvironmentEdgeManager.currentTime(); RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, - region_b, forcible); + region_b, forcible, masterSystemTime); //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java index f70027c..774c167 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java @@ -62,6 +62,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { // We only merge adjacent regions if forcible is false private final boolean forcible; private boolean useCoordinationForAssignment; + private final long masterSystemTime; /* * Transaction state for listener, only valid during execute and @@ -129,6 +130,17 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { */ public RegionMergeTransactionImpl(final Region a, final Region b, final boolean forcible) { + this(a, b, forcible, EnvironmentEdgeManager.currentTime()); + } + /** + * Constructor + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + * @param masterSystemTime the time at the master side + */ + public RegionMergeTransactionImpl(final Region a, final Region b, + final boolean forcible, long masterSystemTime) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { this.region_a = (HRegion)a; this.region_b = (HRegion)b; @@ -137,6 +149,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { this.region_b = (HRegion)a; } this.forcible = forcible; + this.masterSystemTime = masterSystemTime; this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); } @@ -168,6 +181,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { * @return
true if the regions are mergeable else
* false if they are not (e.g. its already closed, etc.).
*/
+ @Override
public boolean prepare(final RegionServerServices services) throws IOException {
if (!region_a.getTableDesc().getTableName()
.equals(region_b.getTableDesc().getTableName())) {
@@ -232,6 +246,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
* @throws IOException
* @see #rollback(Server, RegionServerServices)
*/
+ @Override
public HRegion execute(final Server server,
final RegionServerServices services) throws IOException {
this.server = server;
@@ -335,7 +350,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
if (metaEntries.isEmpty()) {
MetaTableAccessor.mergeRegions(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
- server.getServerName(), region_a.getTableDesc().getRegionReplication());
+ server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
} else {
mergeRegionsAndPutMetaEntries(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
@@ -355,7 +370,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
private void mergeRegionsAndPutMetaEntries(HConnection hConnection,
HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
- ServerName serverName, List