From 3e0721c2482bbf93e1fba54cf8f2b7e48debfed3 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 22 Jun 2015 10:23:13 -0700 Subject: [PATCH] HBASE-13938 Deletes done during the region merge transaction may get eclipsed (Devaraj Das and Enis Soztutar) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 23 ++- .../hadoop/hbase/protobuf/RequestConverter.java | 2 + .../hbase/protobuf/generated/AdminProtos.java | 215 ++++++++++++++++----- hbase-protocol/src/main/protobuf/Admin.proto | 2 + .../hadoop/hbase/master/RegionStateStore.java | 4 +- .../hbase/regionserver/CompactSplitThread.java | 4 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 3 +- .../hbase/regionserver/RegionMergeRequest.java | 7 +- .../hbase/regionserver/RegionMergeTransaction.java | 25 ++- .../apache/hadoop/hbase/TestMetaTableAccessor.java | 56 ++++++ 10 files changed, 280 insertions(+), 61 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 343fd34..0c4f960 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -937,10 +937,18 @@ public class MetaTableAccessor { * table */ public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) { + return makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); + } + + /** + * Generates and returns a Delete containing the region info for the catalog + * table + */ + public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) { if (regionInfo == null) { throw new IllegalArgumentException("Can't make a delete for null region"); } - Delete delete = new Delete(regionInfo.getRegionName()); + Delete delete = new Delete(regionInfo.getRegionName(), ts); return delete; } @@ -1204,24 +1212,29 @@ public class MetaTableAccessor { * @param regionA * @param regionB * @param sn the location of the region + * @param masterSystemTime * @throws IOException */ public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, - HRegionInfo regionA, HRegionInfo regionB, ServerName sn) throws IOException { + HRegionInfo regionA, HRegionInfo regionB, ServerName sn, long masterSystemTime) + throws IOException { Table meta = getMetaHTable(connection); try { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + // use the maximum of what master passed us vs local time. + long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime); + // Put for parent - Put putOfMerged = makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); // Deletes for merging regions - Delete deleteA = makeDeleteFromRegionInfo(regionA); - Delete deleteB = makeDeleteFromRegionInfo(regionB); + Delete deleteA = makeDeleteFromRegionInfo(regionA, time); + Delete deleteB = makeDeleteFromRegionInfo(regionB, time); // The merged is a new region, openSeqNum = 1 is fine. addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 79339a0..6ab0ca7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -913,6 +913,8 @@ public final class RequestConverter { builder.setRegionA(regionASpecifier); builder.setRegionB(regionBSpecifier); builder.setForcible(forcible); + // send the master's wall clock time as well, so that the RS can refer to it + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 9c48784..a2b2e83 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -13531,6 +13531,24 @@ public final class AdminProtos { * optional bool forcible = 3 [default = false]; */ boolean getForcible(); + + // optional uint64 master_system_time = 4; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + boolean hasMasterSystemTime(); + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + long getMasterSystemTime(); } /** * Protobuf type {@code MergeRegionsRequest} @@ -13621,6 +13639,11 @@ public final class AdminProtos { forcible_ = input.readBool(); break; } + case 32: { + bitField0_ |= 0x00000008; + masterSystemTime_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -13721,10 +13744,35 @@ public final class AdminProtos { return forcible_; } + // optional uint64 master_system_time = 4; + public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4; + private long masterSystemTime_; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + private void initFields() { regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); forcible_ = false; + masterSystemTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13763,6 +13811,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(4, masterSystemTime_); + } getUnknownFields().writeTo(output); } @@ -13784,6 +13835,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, masterSystemTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -13822,6 +13877,11 @@ public final class AdminProtos { result = result && (getForcible() == other.getForcible()); } + result = result && (hasMasterSystemTime() == other.hasMasterSystemTime()); + if (hasMasterSystemTime()) { + result = result && (getMasterSystemTime() + == other.getMasterSystemTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -13847,6 +13907,10 @@ public final class AdminProtos { hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getForcible()); } + if (hasMasterSystemTime()) { + hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMasterSystemTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -13979,6 +14043,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); forcible_ = false; bitField0_ = (bitField0_ & ~0x00000004); + masterSystemTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -14027,6 +14093,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000004; } result.forcible_ = forcible_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.masterSystemTime_ = masterSystemTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -14052,6 +14122,9 @@ public final class AdminProtos { if (other.hasForcible()) { setForcible(other.getForcible()); } + if (other.hasMasterSystemTime()) { + setMasterSystemTime(other.getMasterSystemTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14362,6 +14435,55 @@ public final class AdminProtos { return this; } + // optional uint64 master_system_time = 4; + private long masterSystemTime_ ; + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder setMasterSystemTime(long value) { + bitField0_ |= 0x00000008; + masterSystemTime_ = value; + onChanged(); + return this; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder clearMasterSystemTime() { + bitField0_ = (bitField0_ & ~0x00000008); + masterSystemTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:MergeRegionsRequest) } @@ -22210,51 +22332,52 @@ public final class AdminProtos { "teInfo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002" + "(\0132\013.RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013" + ".ServerName\".\n\032UpdateFavoredNodesRespons" + - "e\022\020\n\010response\030\001 \001(\r\"v\n\023MergeRegionsReque" + - "st\022\"\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"" + - "\n\010region_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010fo" + - "rcible\030\003 \001(\010:\005false\"\026\n\024MergeRegionsRespo" + - "nse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n", - "\017key_value_bytes\030\002 \003(\014\022\035\n\025associated_cel" + - "l_count\030\003 \001(\005\"4\n\030ReplicateWALEntryReques" + - "t\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Replicate" + - "WALEntryResponse\"\026\n\024RollWALWriterRequest" + - "\"0\n\025RollWALWriterResponse\022\027\n\017region_to_f" + - "lush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reaso" + - "n\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServ" + - "erInfoRequest\"B\n\nServerInfo\022 \n\013server_na" + - "me\030\001 \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(" + - "\r\"9\n\025GetServerInfoResponse\022 \n\013server_inf", - "o\030\001 \002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurat" + - "ionRequest\"\035\n\033UpdateConfigurationRespons" + - "e2\230\010\n\014AdminService\022>\n\rGetRegionInfo\022\025.Ge" + - "tRegionInfoRequest\032\026.GetRegionInfoRespon" + - "se\022;\n\014GetStoreFile\022\024.GetStoreFileRequest" + - "\032\025.GetStoreFileResponse\022D\n\017GetOnlineRegi" + - "on\022\027.GetOnlineRegionRequest\032\030.GetOnlineR" + - "egionResponse\0225\n\nOpenRegion\022\022.OpenRegion" + - "Request\032\023.OpenRegionResponse\0228\n\013CloseReg" + - "ion\022\023.CloseRegionRequest\032\024.CloseRegionRe", - "sponse\0228\n\013FlushRegion\022\023.FlushRegionReque" + - "st\032\024.FlushRegionResponse\0228\n\013SplitRegion\022" + - "\023.SplitRegionRequest\032\024.SplitRegionRespon" + - "se\022>\n\rCompactRegion\022\025.CompactRegionReque" + - "st\032\026.CompactRegionResponse\022;\n\014MergeRegio" + - "ns\022\024.MergeRegionsRequest\032\025.MergeRegionsR" + - "esponse\022J\n\021ReplicateWALEntry\022\031.Replicate" + - "WALEntryRequest\032\032.ReplicateWALEntryRespo" + - "nse\022?\n\006Replay\022\031.ReplicateWALEntryRequest" + - "\032\032.ReplicateWALEntryResponse\022>\n\rRollWALW", - "riter\022\025.RollWALWriterRequest\032\026.RollWALWr" + - "iterResponse\022>\n\rGetServerInfo\022\025.GetServe" + - "rInfoRequest\032\026.GetServerInfoResponse\0225\n\n" + - "StopServer\022\022.StopServerRequest\032\023.StopSer" + - "verResponse\022M\n\022UpdateFavoredNodes\022\032.Upda" + - "teFavoredNodesRequest\032\033.UpdateFavoredNod" + - "esResponse\022P\n\023UpdateConfiguration\022\033.Upda" + - "teConfigurationRequest\032\034.UpdateConfigura" + - "tionResponseBA\n*org.apache.hadoop.hbase." + - "protobuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" + "e\022\020\n\010response\030\001 \001(\r\"\222\001\n\023MergeRegionsRequ" + + "est\022\"\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022" + + "\"\n\010region_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010f" + + "orcible\030\003 \001(\010:\005false\022\032\n\022master_system_ti" + + "me\030\004 \001(\004\"\026\n\024MergeRegionsResponse\"X\n\010WALE", + "ntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_" + + "bytes\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001" + + "(\005\"4\n\030ReplicateWALEntryRequest\022\030\n\005entry\030" + + "\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALEntryRes" + + "ponse\"\026\n\024RollWALWriterRequest\"0\n\025RollWAL" + + "WriterResponse\022\027\n\017region_to_flush\030\001 \003(\014\"" + + "#\n\021StopServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022" + + "StopServerResponse\"\026\n\024GetServerInfoReque" + + "st\"B\n\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013." + + "ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetSer", + "verInfoResponse\022 \n\013server_info\030\001 \002(\0132\013.S" + + "erverInfo\"\034\n\032UpdateConfigurationRequest\"" + + "\035\n\033UpdateConfigurationResponse2\230\010\n\014Admin" + + "Service\022>\n\rGetRegionInfo\022\025.GetRegionInfo" + + "Request\032\026.GetRegionInfoResponse\022;\n\014GetSt" + + "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" + + "FileResponse\022D\n\017GetOnlineRegion\022\027.GetOnl" + + "ineRegionRequest\032\030.GetOnlineRegionRespon" + + "se\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023.O" + + "penRegionResponse\0228\n\013CloseRegion\022\023.Close", + "RegionRequest\032\024.CloseRegionResponse\0228\n\013F" + + "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" + + "egionResponse\0228\n\013SplitRegion\022\023.SplitRegi" + + "onRequest\032\024.SplitRegionResponse\022>\n\rCompa" + + "ctRegion\022\025.CompactRegionRequest\032\026.Compac" + + "tRegionResponse\022;\n\014MergeRegions\022\024.MergeR" + + "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" + + "ReplicateWALEntry\022\031.ReplicateWALEntryReq" + + "uest\032\032.ReplicateWALEntryResponse\022?\n\006Repl" + + "ay\022\031.ReplicateWALEntryRequest\032\032.Replicat", + "eWALEntryResponse\022>\n\rRollWALWriter\022\025.Rol" + + "lWALWriterRequest\032\026.RollWALWriterRespons" + + "e\022>\n\rGetServerInfo\022\025.GetServerInfoReques" + + "t\032\026.GetServerInfoResponse\0225\n\nStopServer\022" + + "\022.StopServerRequest\032\023.StopServerResponse" + + "\022M\n\022UpdateFavoredNodes\022\032.UpdateFavoredNo" + + "desRequest\032\033.UpdateFavoredNodesResponse\022" + + "P\n\023UpdateConfiguration\022\033.UpdateConfigura" + + "tionRequest\032\034.UpdateConfigurationRespons" + + "eBA\n*org.apache.hadoop.hbase.protobuf.ge", + "neratedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -22386,7 +22509,7 @@ public final class AdminProtos { internal_static_MergeRegionsRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MergeRegionsRequest_descriptor, - new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); + new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", }); internal_static_MergeRegionsResponse_descriptor = getDescriptor().getMessageTypes().get(19); internal_static_MergeRegionsResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index b2fcb44..ffa0aca 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -175,6 +175,8 @@ message MergeRegionsRequest { required RegionSpecifier region_a = 1; required RegionSpecifier region_b = 2; optional bool forcible = 3 [default = false]; + // wall clock time from master + optional uint64 master_system_time = 4; } message MergeRegionsResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index d080837..c56c6f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.zookeeper.KeeperException; @@ -256,6 +257,7 @@ public class RegionStateStore { void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { - MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn); + MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, + EnvironmentEdgeManager.currentTime()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 94538fe..970bf55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -210,9 +210,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestRegionsMerge(final HRegion a, - final HRegion b, final boolean forcible) { + final HRegion b, final boolean forcible, long masterSystemTime) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 609f4cd..1a6f114 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1231,6 +1231,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, HRegion regionA = getRegion(request.getRegionA()); HRegion regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); LOG.info("Receiving merging request for " + regionA + ", " + regionB @@ -1247,7 +1248,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } - regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); + regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index 88551f0..d6e3818 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable { private final HRegionServer server; private final boolean forcible; private TableLock tableLock; + private final long masterSystemTime; - RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible) { + RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible, + long masterSystemTime) { Preconditions.checkNotNull(hrs); this.region_a = a; this.region_b = b; this.server = hrs; this.forcible = forcible; + this.masterSystemTime = masterSystemTime; } @Override @@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable { try { final long startTime = EnvironmentEdgeManager.currentTime(); RegionMergeTransaction mt = new RegionMergeTransaction(region_a, - region_b, forcible); + region_b, forcible, masterSystemTime); //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index e49193d..b184b3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -90,6 +90,7 @@ public class RegionMergeTransaction { // We only merge adjacent regions if forcible is false private final boolean forcible; private boolean useCoordinationForAssignment; + private final long masterSystemTime; /** * Types to add to the transaction journal. Each enum is a step in the merge @@ -151,6 +152,18 @@ public class RegionMergeTransaction { */ public RegionMergeTransaction(final HRegion a, final HRegion b, final boolean forcible) { + this(a, b, forcible, EnvironmentEdgeManager.currentTime()); + } + + /** + * Constructor + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + * @param masterSystemTime the time at the master side + */ + public RegionMergeTransaction(final HRegion a, final HRegion b, + final boolean forcible, long masterSystemTime) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { this.region_a = a; this.region_b = b; @@ -159,6 +172,7 @@ public class RegionMergeTransaction { this.region_b = a; } this.forcible = forcible; + this.masterSystemTime = masterSystemTime; this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); } @@ -326,7 +340,7 @@ public class RegionMergeTransaction { if (metaEntries.isEmpty()) { MetaTableAccessor.mergeRegions(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), - server.getServerName()); + server.getServerName(), masterSystemTime); } else { mergeRegionsAndPutMetaEntries(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), @@ -355,14 +369,17 @@ public class RegionMergeTransaction { HRegionInfo regionB, ServerName serverName, List mutations) throws IOException { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + // use the maximum of what master passed us vs local time. + long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime); + // Put for parent - Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); mutations.add(putOfMerged); // Deletes for merging regions - Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); - Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); + Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time); + Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time); mutations.add(deleteA); mutations.add(deleteB); // The merged is a new region, openSeqNum = 1 is fine. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index ad8dd5e..380d8e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -58,6 +58,7 @@ public class TestMetaTableAccessor { private static final Log LOG = LogFactory.getLog(TestMetaTableAccessor.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static Connection connection; + private Random random = new Random(); @BeforeClass public static void beforeClass() throws Exception { UTIL.startMiniCluster(3); @@ -383,6 +384,9 @@ public class TestMetaTableAccessor { } } + /** + * Tests whether maximum of masters system time versus RSs local system time is used + */ @Test public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException { long regionId = System.currentTimeMillis(); @@ -419,5 +423,57 @@ public class TestMetaTableAccessor { meta.close(); } } + + @Test + public void testMastersSystemTimeIsUsedInMergeRegions() throws IOException { + long regionId = System.currentTimeMillis(); + HRegionInfo regionInfoA = new HRegionInfo(TableName.valueOf("table_foo"), + HConstants.EMPTY_START_ROW, new byte[] {'a'}, false, regionId, 0); + HRegionInfo regionInfoB = new HRegionInfo(TableName.valueOf("table_foo"), + new byte[] {'a'}, HConstants.EMPTY_END_ROW, false, regionId, 0); + HRegionInfo mergedRegionInfo = new HRegionInfo(TableName.valueOf("table_foo"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0); + + ServerName sn = ServerName.valueOf("bar", 0, 0); + Table meta = MetaTableAccessor.getMetaHTable(connection); + try { + List regionInfos = Lists.newArrayList(regionInfoA, regionInfoB); + MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); + + // write the serverName column with a big current time, but set the masters time as even + // bigger. When region merge deletes the rows for regionA and regionB, the serverName columns + // should not be seen by the following get + long serverNameTime = EnvironmentEdgeManager.currentTime() + 100000000; + long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789; + + // write the serverName columns + MetaTableAccessor.updateRegionLocation(connection, regionInfoA, sn, 1, serverNameTime); + + // assert that we have the serverName column with expected ts + Get get = new Get(mergedRegionInfo.getRegionName()); + Result result = meta.get(get); + Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getServerColumn(0)); + assertNotNull(serverCell); + assertEquals(serverNameTime, serverCell.getTimestamp()); + + // now merge the regions, effectively deleting the rows for region a and b. + MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, + regionInfoA, regionInfoB, sn, masterSystemTime); + + result = meta.get(get); + serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getServerColumn(0)); + Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getStartCodeColumn(0)); + Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getSeqNumColumn(0)); + assertNull(serverCell); + assertNull(startCodeCell); + assertNull(seqNumCell); + } finally { + meta.close(); + } + } } -- 1.9.5 (Apple Git-50.3)