From addd4210f2bc30346def93d1235821814d890ff5 Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 26 Oct 2015 12:05:19 +0800 Subject: [PATCH] HBASE-12986 Compaction pressure based client pushback --- .../backoff/ExponentialClientBackoffPolicy.java | 6 + .../hbase/client/backoff/ServerStatistics.java | 6 + .../hbase/client/TestClientExponentialBackoff.java | 34 +++- .../hbase/protobuf/generated/ClientProtos.java | 187 +++++++++++++++++---- hbase-protocol/src/main/protobuf/Client.proto | 2 + .../apache/hadoop/hbase/regionserver/HRegion.java | 2 + 6 files changed, 201 insertions(+), 36 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java index 5b1d3d2..a930837 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -70,6 +70,10 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { // Factor in heap occupancy float heapOccupancy = regionStats.getHeapOccupancyPercent() / 100.0f; + + // Factor in compaction pressure, 1.0 means heavy compaction pressure + float compactionPressure = regionStats.getCompactionPressure() / 100.0f; + if (heapOccupancy >= heapOccupancyLowWatermark) { // If we are higher than the high watermark, we are already applying max // backoff and cannot scale more (see scale() below) @@ -81,6 +85,8 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { 0.1, 1.0)); } + percent = Math.max(percent, compactionPressure); + // square the percent as a value less than 1. Closer we move to 100 percent, // the percent moves to 1, but squaring causes the exponential curve double multiplier = Math.pow(percent, 4.0); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java index c7519be..1b70426 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -57,10 +57,12 @@ public class ServerStatistics { public static class RegionStatistics { private int memstoreLoad = 0; private int heapOccupancy = 0; + private int compactionPressure = 0; public void update(ClientProtos.RegionLoadStats currentStats) { this.memstoreLoad = currentStats.getMemstoreLoad(); this.heapOccupancy = currentStats.getHeapOccupancy(); + this.compactionPressure = currentStats.getCompactionPressure(); } public int getMemstoreLoadPercent(){ @@ -70,5 +72,9 @@ public class ServerStatistics { public int getHeapOccupancyPercent(){ return this.heapOccupancy; } + + public int getCompactionPressure() { + return compactionPressure; + } } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java index e1a5ce6..6cace5d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -113,22 +113,47 @@ public class TestClientExponentialBackoff { ServerStatistics stats = new ServerStatistics(); long backoffTime; - update(stats, 0, 95); + update(stats, 0, 95, 0); backoffTime = backoff.getBackoffTime(server, regionname, stats); assertTrue("Heap occupancy at low watermark had no effect", backoffTime > 0); long previous = backoffTime; - update(stats, 0, 96); + update(stats, 0, 96, 0); backoffTime = backoff.getBackoffTime(server, regionname, stats); assertTrue("Increase above low watermark should have increased backoff", backoffTime > previous); - update(stats, 0, 98); + update(stats, 0, 98, 0); backoffTime = backoff.getBackoffTime(server, regionname, stats); assertEquals("We should be using max backoff when at high watermark", backoffTime, ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); } + @Test + public void testCompactionPressurePolicy() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long backoffTime; + + update(stats, 0, 0, 0); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertTrue("Compaction pressure has no effect", backoffTime == 0); + + long previous = backoffTime; + update(stats, 0, 0, 50); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertTrue("Compaction pressure should be bigger", + backoffTime > previous); + + update(stats, 0, 0, 100); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertEquals("under heavy compaction pressure", backoffTime, + ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + + } + private void update(ServerStatistics stats, int load) { ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() .setMemstoreLoad @@ -136,10 +161,11 @@ public class TestClientExponentialBackoff { stats.update(regionname, stat); } - private void update(ServerStatistics stats, int memstoreLoad, int heapOccupancy) { + private void update(ServerStatistics stats, int memstoreLoad, int heapOccupancy, int compactionPressure) { ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() .setMemstoreLoad(memstoreLoad) .setHeapOccupancy(heapOccupancy) + .setCompactionPressure(compactionPressure) .build(); stats.update(regionname, stat); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index c4b1eec..0e1ac28 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -27399,6 +27399,24 @@ public final class ClientProtos { * */ int getHeapOccupancy(); + + // optional int32 compactionPressure = 3 [default = 0]; + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+     * Compaction pressure. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + boolean hasCompactionPressure(); + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+     * Compaction pressure. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + int getCompactionPressure(); } /** * Protobuf type {@code hbase.pb.RegionLoadStats} @@ -27466,6 +27484,11 @@ public final class ClientProtos { heapOccupancy_ = input.readInt32(); break; } + case 24: { + bitField0_ |= 0x00000004; + compactionPressure_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -27556,9 +27579,34 @@ public final class ClientProtos { return heapOccupancy_; } + // optional int32 compactionPressure = 3 [default = 0]; + public static final int COMPACTIONPRESSURE_FIELD_NUMBER = 3; + private int compactionPressure_; + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+     * Compaction pressure. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public boolean hasCompactionPressure() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+     * Compaction pressure. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public int getCompactionPressure() { + return compactionPressure_; + } + private void initFields() { memstoreLoad_ = 0; heapOccupancy_ = 0; + compactionPressure_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -27578,6 +27626,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeInt32(2, heapOccupancy_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt32(3, compactionPressure_); + } getUnknownFields().writeTo(output); } @@ -27595,6 +27646,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeInt32Size(2, heapOccupancy_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, compactionPressure_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -27628,6 +27683,11 @@ public final class ClientProtos { result = result && (getHeapOccupancy() == other.getHeapOccupancy()); } + result = result && (hasCompactionPressure() == other.hasCompactionPressure()); + if (hasCompactionPressure()) { + result = result && (getCompactionPressure() + == other.getCompactionPressure()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -27649,6 +27709,10 @@ public final class ClientProtos { hash = (37 * hash) + HEAPOCCUPANCY_FIELD_NUMBER; hash = (53 * hash) + getHeapOccupancy(); } + if (hasCompactionPressure()) { + hash = (37 * hash) + COMPACTIONPRESSURE_FIELD_NUMBER; + hash = (53 * hash) + getCompactionPressure(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -27767,6 +27831,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000001); heapOccupancy_ = 0; bitField0_ = (bitField0_ & ~0x00000002); + compactionPressure_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -27803,6 +27869,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000002; } result.heapOccupancy_ = heapOccupancy_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.compactionPressure_ = compactionPressure_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -27825,6 +27895,9 @@ public final class ClientProtos { if (other.hasHeapOccupancy()) { setHeapOccupancy(other.getHeapOccupancy()); } + if (other.hasCompactionPressure()) { + setCompactionPressure(other.getCompactionPressure()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -27954,6 +28027,55 @@ public final class ClientProtos { return this; } + // optional int32 compactionPressure = 3 [default = 0]; + private int compactionPressure_ ; + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+       * Compaction pressure. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public boolean hasCompactionPressure() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+       * Compaction pressure. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public int getCompactionPressure() { + return compactionPressure_; + } + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+       * Compaction pressure. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder setCompactionPressure(int value) { + bitField0_ |= 0x00000004; + compactionPressure_ = value; + onChanged(); + return this; + } + /** + * optional int32 compactionPressure = 3 [default = 0]; + * + *
+       * Compaction pressure. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder clearCompactionPressure() { + bitField0_ = (bitField0_ & ~0x00000004); + compactionPressure_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.RegionLoadStats) } @@ -33217,38 +33339,39 @@ public final class ClientProtos { "base.pb.CoprocessorServiceCall\"k\n\014Region" + "Action\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Region", "Specifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(" + - "\0132\020.hbase.pb.Action\"D\n\017RegionLoadStats\022\027" + + "\0132\020.hbase.pb.Action\"c\n\017RegionLoadStats\022\027" + "\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy" + - "\030\002 \001(\005:\0010\"\332\001\n\021ResultOrException\022\r\n\005index" + - "\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result" + - "\022*\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytes" + - "Pair\022:\n\016service_result\030\004 \001(\0132\".hbase.pb." + - "CoprocessorServiceResult\022,\n\tloadStats\030\005 " + - "\001(\0132\031.hbase.pb.RegionLoadStats\"x\n\022Region" + - "ActionResult\0226\n\021resultOrException\030\001 \003(\0132", - "\033.hbase.pb.ResultOrException\022*\n\texceptio" + - "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + - "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + - "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + - "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\\\n\rMu" + - "ltiResponse\0228\n\022regionActionResult\030\001 \003(\0132" + - "\034.hbase.pb.RegionActionResult\022\021\n\tprocess" + - "ed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010T" + - "IMELINE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024.hba" + - "se.pb.GetRequest\032\025.hbase.pb.GetResponse\022", - ";\n\006Mutate\022\027.hbase.pb.MutateRequest\032\030.hba" + - "se.pb.MutateResponse\0225\n\004Scan\022\025.hbase.pb." + - "ScanRequest\032\026.hbase.pb.ScanResponse\022P\n\rB" + - "ulkLoadHFile\022\036.hbase.pb.BulkLoadHFileReq" + - "uest\032\037.hbase.pb.BulkLoadHFileResponse\022X\n" + - "\013ExecService\022#.hbase.pb.CoprocessorServi" + - "ceRequest\032$.hbase.pb.CoprocessorServiceR" + - "esponse\022d\n\027ExecRegionServerService\022#.hba" + - "se.pb.CoprocessorServiceRequest\032$.hbase." + - "pb.CoprocessorServiceResponse\0228\n\005Multi\022\026", - ".hbase.pb.MultiRequest\032\027.hbase.pb.MultiR" + - "esponseBB\n*org.apache.hadoop.hbase.proto" + - "buf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010" + + "\"\332\001\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022 \n" + + "\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texcep" + + "tion\030\003 \001(\0132\027.hbase.pb.NameBytesPair\022:\n\016s" + + "ervice_result\030\004 \001(\0132\".hbase.pb.Coprocess" + + "orServiceResult\022,\n\tloadStats\030\005 \001(\0132\031.hba" + + "se.pb.RegionLoadStats\"x\n\022RegionActionRes", + "ult\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.p" + + "b.ResultOrException\022*\n\texception\030\002 \001(\0132\027" + + ".hbase.pb.NameBytesPair\"x\n\014MultiRequest\022" + + ",\n\014regionAction\030\001 \003(\0132\026.hbase.pb.RegionA" + + "ction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003" + + " \001(\0132\023.hbase.pb.Condition\"\\\n\rMultiRespon" + + "se\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.p" + + "b.RegionActionResult\022\021\n\tprocessed\030\002 \001(\010*" + + "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" + + "2\203\004\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get", + "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" + + "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" + + "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque" + + "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" + + "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" + + "ase.pb.BulkLoadHFileResponse\022X\n\013ExecServ" + + "ice\022#.hbase.pb.CoprocessorServiceRequest" + + "\032$.hbase.pb.CoprocessorServiceResponse\022d" + + "\n\027ExecRegionServerService\022#.hbase.pb.Cop" + + "rocessorServiceRequest\032$.hbase.pb.Coproc", + "essorServiceResponse\0228\n\005Multi\022\026.hbase.pb" + + ".MultiRequest\032\027.hbase.pb.MultiResponseBB" + + "\n*org.apache.hadoop.hbase.protobuf.gener" + + "atedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -33410,7 +33533,7 @@ public final class ClientProtos { internal_static_hbase_pb_RegionLoadStats_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_RegionLoadStats_descriptor, - new java.lang.String[] { "MemstoreLoad", "HeapOccupancy", }); + new java.lang.String[] { "MemstoreLoad", "HeapOccupancy", "CompactionPressure", }); internal_static_hbase_pb_ResultOrException_descriptor = getDescriptor().getMessageTypes().get(23); internal_static_hbase_pb_ResultOrException_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 101854d..c4e23f4 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -394,6 +394,8 @@ message RegionLoadStats { // Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. // We can move this to "ServerLoadStats" should we develop them. optional int32 heapOccupancy = 2 [default = 0]; + // Compaction pressure. Guaranteed to be positive, between 0 and 100 + optional int32 compactionPressure = 3 [default = 0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 34738de..54c4264 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6708,6 +6708,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this .memstoreFlushSize))); stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100); + stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 : + (int)rsServices.getCompactionPressure()*100); return stats.build(); } -- 1.9.3 (Apple Git-50)