From b9b3d293150f2e3050d0c81c70521d15727a51cf Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 22 Jun 2015 10:23:13 -0700 Subject: [PATCH] HBASE-13938 Deletes done during the region merge transaction may get eclipsed (Devaraj Das and Enis Soztutar) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java --- .../hadoop/hbase/protobuf/RequestConverter.java | 3 + .../hbase/protobuf/generated/AdminProtos.java | 209 ++++++++++++++++----- .../protobuf/generated/VisibilityLabelsProtos.java | 6 +- hbase-protocol/src/main/protobuf/Admin.proto | 2 + .../apache/hadoop/hbase/catalog/MetaEditor.java | 22 ++- .../hadoop/hbase/master/RegionStateStore.java | 5 +- .../hbase/regionserver/CompactSplitThread.java | 4 +- .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../hbase/regionserver/RegionMergeRequest.java | 7 +- .../hbase/regionserver/RegionMergeTransaction.java | 26 ++- 10 files changed, 224 insertions(+), 63 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index d14c2fd..6681ca5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Triple; @@ -906,6 +907,8 @@ public final class RequestConverter { builder.setRegionA(regionASpecifier); builder.setRegionB(regionBSpecifier); builder.setForcible(forcible); + // send the master's wall clock time as well, so that the RS can refer to it + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTimeMillis()); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 636e51f..36d6080 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -13409,6 +13409,24 @@ public final class AdminProtos { * optional bool forcible = 3 [default = false]; */ boolean getForcible(); + + // optional uint64 master_system_time = 4; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + boolean hasMasterSystemTime(); + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + long getMasterSystemTime(); } /** * Protobuf type {@code MergeRegionsRequest} @@ -13499,6 +13517,11 @@ public final class AdminProtos { forcible_ = input.readBool(); break; } + case 32: { + bitField0_ |= 0x00000008; + masterSystemTime_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -13599,10 +13622,35 @@ public final class AdminProtos { return forcible_; } + // optional uint64 master_system_time = 4; + public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4; + private long masterSystemTime_; + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+     * wall clock time from master
+     * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + private void initFields() { regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); forcible_ = false; + masterSystemTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13641,6 +13689,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(4, masterSystemTime_); + } getUnknownFields().writeTo(output); } @@ -13662,6 +13713,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, forcible_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, masterSystemTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -13700,6 +13755,11 @@ public final class AdminProtos { result = result && (getForcible() == other.getForcible()); } + result = result && (hasMasterSystemTime() == other.hasMasterSystemTime()); + if (hasMasterSystemTime()) { + result = result && (getMasterSystemTime() + == other.getMasterSystemTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -13725,6 +13785,10 @@ public final class AdminProtos { hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getForcible()); } + if (hasMasterSystemTime()) { + hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMasterSystemTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -13857,6 +13921,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); forcible_ = false; bitField0_ = (bitField0_ & ~0x00000004); + masterSystemTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -13905,6 +13971,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000004; } result.forcible_ = forcible_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.masterSystemTime_ = masterSystemTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -13930,6 +14000,9 @@ public final class AdminProtos { if (other.hasForcible()) { setForcible(other.getForcible()); } + if (other.hasMasterSystemTime()) { + setMasterSystemTime(other.getMasterSystemTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14240,6 +14313,55 @@ public final class AdminProtos { return this; } + // optional uint64 master_system_time = 4; + private long masterSystemTime_ ; + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder setMasterSystemTime(long value) { + bitField0_ |= 0x00000008; + masterSystemTime_ = value; + onChanged(); + return this; + } + /** + * optional uint64 master_system_time = 4; + * + *
+       * wall clock time from master
+       * 
+ */ + public Builder clearMasterSystemTime() { + bitField0_ = (bitField0_ & ~0x00000008); + masterSystemTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:MergeRegionsRequest) } @@ -21318,48 +21440,49 @@ public final class AdminProtos { "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" + "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" + "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" + - "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" + - "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" + - "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" + - "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" + - "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" + - "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as", - "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" + - "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" + - "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" + - "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" + - "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" + - "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" + - "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" + - " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" + - "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " + - "\n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014Adm", - "inService\022>\n\rGetRegionInfo\022\025.GetRegionIn" + - "foRequest\032\026.GetRegionInfoResponse\022;\n\014Get" + - "StoreFile\022\024.GetStoreFileRequest\032\025.GetSto" + - "reFileResponse\022D\n\017GetOnlineRegion\022\027.GetO" + - "nlineRegionRequest\032\030.GetOnlineRegionResp" + - "onse\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023" + - ".OpenRegionResponse\0228\n\013CloseRegion\022\023.Clo" + - "seRegionRequest\032\024.CloseRegionResponse\0228\n" + - "\013FlushRegion\022\023.FlushRegionRequest\032\024.Flus" + - "hRegionResponse\0228\n\013SplitRegion\022\023.SplitRe", - "gionRequest\032\024.SplitRegionResponse\022>\n\rCom" + - "pactRegion\022\025.CompactRegionRequest\032\026.Comp" + - "actRegionResponse\022;\n\014MergeRegions\022\024.Merg" + - "eRegionsRequest\032\025.MergeRegionsResponse\022J" + - "\n\021ReplicateWALEntry\022\031.ReplicateWALEntryR" + - "equest\032\032.ReplicateWALEntryResponse\022?\n\006Re" + - "play\022\031.ReplicateWALEntryRequest\032\032.Replic" + - "ateWALEntryResponse\022>\n\rRollWALWriter\022\025.R" + - "ollWALWriterRequest\032\026.RollWALWriterRespo" + - "nse\022>\n\rGetServerInfo\022\025.GetServerInfoRequ", - "est\032\026.GetServerInfoResponse\0225\n\nStopServe" + - "r\022\022.StopServerRequest\032\023.StopServerRespon" + - "se\022M\n\022UpdateFavoredNodes\022\032.UpdateFavored" + - "NodesRequest\032\033.UpdateFavoredNodesRespons" + - "eBA\n*org.apache.hadoop.hbase.protobuf.ge" + - "neratedB\013AdminProtosH\001\210\001\001\240\001\001" + "NodesResponse\022\020\n\010response\030\001 \001(\r\"\222\001\n\023Merg" + + "eRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regi" + + "onSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSp" + + "ecifier\022\027\n\010forcible\030\003 \001(\010:\005false\022\032\n\022mast" + + "er_system_time\030\004 \001(\004\"\026\n\024MergeRegionsResp" + + "onse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027", + "\n\017key_value_bytes\030\002 \003(\014\022\035\n\025associated_ce" + + "ll_count\030\003 \001(\005\"4\n\030ReplicateWALEntryReque" + + "st\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Replicat" + + "eWALEntryResponse\"\026\n\024RollWALWriterReques" + + "t\"0\n\025RollWALWriterResponse\022\027\n\017region_to_" + + "flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reas" + + "on\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSer" + + "verInfoRequest\"B\n\nServerInfo\022 \n\013server_n" + + "ame\030\001 \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001" + + "(\r\"9\n\025GetServerInfoResponse\022 \n\013server_in", + "fo\030\001 \002(\0132\013.ServerInfo2\306\007\n\014AdminService\022>" + + "\n\rGetRegionInfo\022\025.GetRegionInfoRequest\032\026" + + ".GetRegionInfoResponse\022;\n\014GetStoreFile\022\024" + + ".GetStoreFileRequest\032\025.GetStoreFileRespo" + + "nse\022D\n\017GetOnlineRegion\022\027.GetOnlineRegion" + + "Request\032\030.GetOnlineRegionResponse\0225\n\nOpe" + + "nRegion\022\022.OpenRegionRequest\032\023.OpenRegion" + + "Response\0228\n\013CloseRegion\022\023.CloseRegionReq" + + "uest\032\024.CloseRegionResponse\0228\n\013FlushRegio" + + "n\022\023.FlushRegionRequest\032\024.FlushRegionResp", + "onse\0228\n\013SplitRegion\022\023.SplitRegionRequest" + + "\032\024.SplitRegionResponse\022>\n\rCompactRegion\022" + + "\025.CompactRegionRequest\032\026.CompactRegionRe" + + "sponse\022;\n\014MergeRegions\022\024.MergeRegionsReq" + + "uest\032\025.MergeRegionsResponse\022J\n\021Replicate" + + "WALEntry\022\031.ReplicateWALEntryRequest\032\032.Re" + + "plicateWALEntryResponse\022?\n\006Replay\022\031.Repl" + + "icateWALEntryRequest\032\032.ReplicateWALEntry" + + "Response\022>\n\rRollWALWriter\022\025.RollWALWrite" + + "rRequest\032\026.RollWALWriterResponse\022>\n\rGetS", + "erverInfo\022\025.GetServerInfoRequest\032\026.GetSe" + + "rverInfoResponse\0225\n\nStopServer\022\022.StopSer" + + "verRequest\032\023.StopServerResponse\022M\n\022Updat" + + "eFavoredNodes\022\032.UpdateFavoredNodesReques" + + "t\032\033.UpdateFavoredNodesResponseBA\n*org.ap" + + "ache.hadoop.hbase.protobuf.generatedB\013Ad" + + "minProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21491,7 +21614,7 @@ public final class AdminProtos { internal_static_MergeRegionsRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MergeRegionsRequest_descriptor, - new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); + new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", }); internal_static_MergeRegionsResponse_descriptor = getDescriptor().getMessageTypes().get(19); internal_static_MergeRegionsResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java index 294772e..70593b0 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java @@ -5092,7 +5092,7 @@ public final class VisibilityLabelsProtos { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -5108,7 +5108,7 @@ public final class VisibilityLabelsProtos { getRegexBytes() { java.lang.Object ref = regex_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); regex_ = b; @@ -5414,7 +5414,7 @@ public final class VisibilityLabelsProtos { getRegexBytes() { java.lang.Object ref = regex_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); regex_ = b; diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index ecf30f4..b1c0d64 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -173,6 +173,8 @@ message MergeRegionsRequest { required RegionSpecifier region_a = 1; required RegionSpecifier region_b = 2; optional bool forcible = 3 [default = false]; + // wall clock time from master + optional uint64 master_system_time = 4; } message MergeRegionsResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 786b0f2..9c3e75b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -80,10 +80,18 @@ public class MetaEditor { * table */ public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) { + return makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); + } + + /** + * Generates and returns a Delete containing the region info for the catalog + * table + */ + public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) { if (regionInfo == null) { throw new IllegalArgumentException("Can't make a delete for null region"); } - Delete delete = new Delete(regionInfo.getRegionName()); + Delete delete = new Delete(regionInfo.getRegionName(), ts); return delete; } @@ -327,25 +335,29 @@ public class MetaEditor { * @param regionA * @param regionB * @param sn the location of the region + * @param masterSystemTime * @throws IOException */ public static void mergeRegions(final CatalogTracker catalogTracker, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, - ServerName sn) throws IOException { + ServerName sn, long masterSystemTime) throws IOException { HTable meta = MetaReader.getMetaHTable(catalogTracker); try { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + // use the maximum of what master passed us vs local time. + long time = Math.max(EnvironmentEdgeManager.currentTimeMillis(), masterSystemTime); + // Put for parent - Put putOfMerged = makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); // Deletes for merging regions - Delete deleteA = makeDeleteFromRegionInfo(regionA); - Delete deleteB = makeDeleteFromRegionInfo(regionB); + Delete deleteA = makeDeleteFromRegionInfo(regionA, time); + Delete deleteB = makeDeleteFromRegionInfo(regionB, time); // The merged is a new region, openSeqNum = 1 is fine. addLocation(putOfMerged, sn, 1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 45c4fd2..95c0d6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.zookeeper.KeeperException; @@ -232,7 +232,8 @@ public class RegionStateStore { void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { - MetaEditor.mergeRegions(catalogTracker, p, a, b, sn); + MetaEditor.mergeRegions(catalogTracker, p, a, b, sn, + EnvironmentEdgeManager.currentTimeMillis()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 5bdf977..6596aac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -199,9 +199,9 @@ public class CompactSplitThread implements CompactionRequestor { } public synchronized void requestRegionsMerge(final HRegion a, - final HRegion b, final boolean forcible) { + final HRegion b, final boolean forcible, long masterSystemTime) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e7e382d..f7dd582 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4149,6 +4149,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa HRegion regionA = getRegion(request.getRegionA()); HRegion regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); LOG.info("Receiving merging request for " + regionA + ", " + regionB @@ -4165,7 +4166,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa long endTime = EnvironmentEdgeManager.currentTimeMillis(); metricsRegionServer.updateFlushTime(endTime - startTime); } - compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); + compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index 0ab7a64..4b6e862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable { private final HRegionServer server; private final boolean forcible; private TableLock tableLock; + private final long masterSystemTime; - RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible) { + RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible, + long masterSystemTime) { Preconditions.checkNotNull(hrs); this.region_a = a; this.region_b = b; this.server = hrs; this.forcible = forcible; + this.masterSystemTime = masterSystemTime; } @Override @@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable { try { final long startTime = EnvironmentEdgeManager.currentTimeMillis(); RegionMergeTransaction mt = new RegionMergeTransaction(region_a, - region_b, forcible); + region_b, forcible, masterSystemTime); //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index 054da41..6af8f36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.executor.EventType; @@ -102,6 +101,7 @@ public class RegionMergeTransaction { // We only merge adjacent regions if forcible is false private final boolean forcible; private boolean useZKForAssignment; + private final long masterSystemTime; /** * Types to add to the transaction journal. Each enum is a step in the merge @@ -161,6 +161,18 @@ public class RegionMergeTransaction { */ public RegionMergeTransaction(final HRegion a, final HRegion b, final boolean forcible) { + this(a, b, forcible, EnvironmentEdgeManager.currentTimeMillis()); + } + + /** + * Constructor + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + * @param masterSystemTime the time at the master side + */ + public RegionMergeTransaction(final HRegion a, final HRegion b, + final boolean forcible, long masterSystemTime) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { this.region_a = a; this.region_b = b; @@ -169,6 +181,7 @@ public class RegionMergeTransaction { this.region_b = a; } this.forcible = forcible; + this.masterSystemTime = masterSystemTime; this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); } @@ -323,7 +336,7 @@ public class RegionMergeTransaction { if (!testing && useZKForAssignment) { if (metaEntries.isEmpty()) { MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a - .getRegionInfo(), region_b.getRegionInfo(), server.getServerName()); + .getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), masterSystemTime); } else { mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries); @@ -351,14 +364,17 @@ public class RegionMergeTransaction { HRegionInfo regionB, ServerName serverName, List mutations) throws IOException { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + // use the maximum of what master passed us vs local time. + long time = Math.max(EnvironmentEdgeManager.currentTimeMillis(), masterSystemTime); + // Put for parent - Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged); + Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged, time); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); mutations.add(putOfMerged); // Deletes for merging regions - Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA); - Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB); + Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA, time); + Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB, time); mutations.add(deleteA); mutations.add(deleteB); // The merged is a new region, openSeqNum = 1 is fine. -- 1.9.5 (Apple Git-50.3)