diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index 6f5051b..a9c5254 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -178,6 +178,18 @@ public class ServerLoad { return serverLoad.getInfoServerPort(); } + public String getReplicationLoadSourceString() { + if (serverLoad.hasReplicationLoadSourceString()) + return serverLoad.getReplicationLoadSourceString(); + else return "Replication disabled"; + } + + public String getReplicationLoadSinkString() { + if (serverLoad.hasReplicationLoadSinkString()) + return serverLoad.getReplicationLoadSinkString(); + else return "Replication disabled"; + } + /** * Originally, this method factored in the effect of requests going to the * server as well. However, this does not interact very well with the current @@ -301,4 +313,5 @@ public class ServerLoad { public static final ServerLoad EMPTY_SERVERLOAD = new ServerLoad(ClusterStatusProtos.ServerLoad.newBuilder().build()); + } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java index c558485..c26c7fd 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java @@ -4473,6 +4473,66 @@ public final class ClusterStatusProtos { * */ int getInfoServerPort(); + + // optional string replicationLoadSourceString = 10; + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + boolean hasReplicationLoadSourceString(); + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + java.lang.String getReplicationLoadSourceString(); + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + com.google.protobuf.ByteString + getReplicationLoadSourceStringBytes(); + + // optional string replicationLoadSinkString = 11; + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + boolean hasReplicationLoadSinkString(); + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + java.lang.String getReplicationLoadSinkString(); + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + com.google.protobuf.ByteString + getReplicationLoadSinkStringBytes(); } /** * Protobuf type {@code ServerLoad} @@ -4576,6 +4636,16 @@ public final class ClusterStatusProtos { infoServerPort_ = input.readUInt32(); break; } + case 82: { + bitField0_ |= 0x00000080; + replicationLoadSourceString_ = input.readBytes(); + break; + } + case 90: { + bitField0_ |= 0x00000100; + replicationLoadSinkString_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4931,6 +5001,122 @@ public final class ClusterStatusProtos { return infoServerPort_; } + // optional string replicationLoadSourceString = 10; + public static final int REPLICATIONLOADSOURCESTRING_FIELD_NUMBER = 10; + private java.lang.Object replicationLoadSourceString_; + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + public boolean hasReplicationLoadSourceString() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + public java.lang.String getReplicationLoadSourceString() { + java.lang.Object ref = replicationLoadSourceString_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + replicationLoadSourceString_ = s; + } + return s; + } + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+     **
+     * The replicationLoadSourceString for the replication Source status of this region server .
+     * 
+ */ + public com.google.protobuf.ByteString + getReplicationLoadSourceStringBytes() { + java.lang.Object ref = replicationLoadSourceString_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationLoadSourceString_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string replicationLoadSinkString = 11; + public static final int REPLICATIONLOADSINKSTRING_FIELD_NUMBER = 11; + private java.lang.Object replicationLoadSinkString_; + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + public boolean hasReplicationLoadSinkString() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + public java.lang.String getReplicationLoadSinkString() { + java.lang.Object ref = replicationLoadSinkString_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + replicationLoadSinkString_ = s; + } + return s; + } + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+     **
+     * The replicationLoadSinkString for the replication Sink status of this region server .
+     * 
+ */ + public com.google.protobuf.ByteString + getReplicationLoadSinkStringBytes() { + java.lang.Object ref = replicationLoadSinkString_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationLoadSinkString_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { numberOfRequests_ = 0; totalNumberOfRequests_ = 0; @@ -4941,6 +5127,8 @@ public final class ClusterStatusProtos { reportStartTime_ = 0L; reportEndTime_ = 0L; infoServerPort_ = 0; + replicationLoadSourceString_ = ""; + replicationLoadSinkString_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4993,6 +5181,12 @@ public final class ClusterStatusProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeUInt32(9, infoServerPort_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBytes(10, getReplicationLoadSourceStringBytes()); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeBytes(11, getReplicationLoadSinkStringBytes()); + } getUnknownFields().writeTo(output); } @@ -5038,6 +5232,14 @@ public final class ClusterStatusProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(9, infoServerPort_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(10, getReplicationLoadSourceStringBytes()); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(11, getReplicationLoadSinkStringBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5100,6 +5302,16 @@ public final class ClusterStatusProtos { result = result && (getInfoServerPort() == other.getInfoServerPort()); } + result = result && (hasReplicationLoadSourceString() == other.hasReplicationLoadSourceString()); + if (hasReplicationLoadSourceString()) { + result = result && getReplicationLoadSourceString() + .equals(other.getReplicationLoadSourceString()); + } + result = result && (hasReplicationLoadSinkString() == other.hasReplicationLoadSinkString()); + if (hasReplicationLoadSinkString()) { + result = result && getReplicationLoadSinkString() + .equals(other.getReplicationLoadSinkString()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5149,6 +5361,14 @@ public final class ClusterStatusProtos { hash = (37 * hash) + INFO_SERVER_PORT_FIELD_NUMBER; hash = (53 * hash) + getInfoServerPort(); } + if (hasReplicationLoadSourceString()) { + hash = (37 * hash) + REPLICATIONLOADSOURCESTRING_FIELD_NUMBER; + hash = (53 * hash) + getReplicationLoadSourceString().hashCode(); + } + if (hasReplicationLoadSinkString()) { + hash = (37 * hash) + REPLICATIONLOADSINKSTRING_FIELD_NUMBER; + hash = (53 * hash) + getReplicationLoadSinkString().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -5286,6 +5506,10 @@ public final class ClusterStatusProtos { bitField0_ = (bitField0_ & ~0x00000080); infoServerPort_ = 0; bitField0_ = (bitField0_ & ~0x00000100); + replicationLoadSourceString_ = ""; + bitField0_ = (bitField0_ & ~0x00000200); + replicationLoadSinkString_ = ""; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -5360,6 +5584,14 @@ public final class ClusterStatusProtos { to_bitField0_ |= 0x00000040; } result.infoServerPort_ = infoServerPort_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000080; + } + result.replicationLoadSourceString_ = replicationLoadSourceString_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000100; + } + result.replicationLoadSinkString_ = replicationLoadSinkString_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5449,6 +5681,16 @@ public final class ClusterStatusProtos { if (other.hasInfoServerPort()) { setInfoServerPort(other.getInfoServerPort()); } + if (other.hasReplicationLoadSourceString()) { + bitField0_ |= 0x00000200; + replicationLoadSourceString_ = other.replicationLoadSourceString_; + onChanged(); + } + if (other.hasReplicationLoadSinkString()) { + bitField0_ |= 0x00000400; + replicationLoadSinkString_ = other.replicationLoadSinkString_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6537,6 +6779,214 @@ public final class ClusterStatusProtos { return this; } + // optional string replicationLoadSourceString = 10; + private java.lang.Object replicationLoadSourceString_ = ""; + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public boolean hasReplicationLoadSourceString() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public java.lang.String getReplicationLoadSourceString() { + java.lang.Object ref = replicationLoadSourceString_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + replicationLoadSourceString_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public com.google.protobuf.ByteString + getReplicationLoadSourceStringBytes() { + java.lang.Object ref = replicationLoadSourceString_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationLoadSourceString_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public Builder setReplicationLoadSourceString( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000200; + replicationLoadSourceString_ = value; + onChanged(); + return this; + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public Builder clearReplicationLoadSourceString() { + bitField0_ = (bitField0_ & ~0x00000200); + replicationLoadSourceString_ = getDefaultInstance().getReplicationLoadSourceString(); + onChanged(); + return this; + } + /** + * optional string replicationLoadSourceString = 10; + * + *
+       **
+       * The replicationLoadSourceString for the replication Source status of this region server .
+       * 
+ */ + public Builder setReplicationLoadSourceStringBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000200; + replicationLoadSourceString_ = value; + onChanged(); + return this; + } + + // optional string replicationLoadSinkString = 11; + private java.lang.Object replicationLoadSinkString_ = ""; + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public boolean hasReplicationLoadSinkString() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public java.lang.String getReplicationLoadSinkString() { + java.lang.Object ref = replicationLoadSinkString_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + replicationLoadSinkString_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public com.google.protobuf.ByteString + getReplicationLoadSinkStringBytes() { + java.lang.Object ref = replicationLoadSinkString_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationLoadSinkString_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public Builder setReplicationLoadSinkString( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000400; + replicationLoadSinkString_ = value; + onChanged(); + return this; + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public Builder clearReplicationLoadSinkString() { + bitField0_ = (bitField0_ & ~0x00000400); + replicationLoadSinkString_ = getDefaultInstance().getReplicationLoadSinkString(); + onChanged(); + return this; + } + /** + * optional string replicationLoadSinkString = 11; + * + *
+       **
+       * The replicationLoadSinkString for the replication Sink status of this region server .
+       * 
+ */ + public Builder setReplicationLoadSinkStringBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000400; + replicationLoadSinkString_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ServerLoad) } @@ -10361,27 +10811,28 @@ public final class ClusterStatusProtos { "ompacted_KVs\030\013 \001(\004\022\032\n\022root_index_size_KB" + "\030\014 \001(\r\022\"\n\032total_static_index_size_KB\030\r \001" + "(\r\022\"\n\032total_static_bloom_size_KB\030\016 \001(\r\022\034" + - "\n\024complete_sequence_id\030\017 \001(\004\"\212\002\n\nServerL" + + "\n\024complete_sequence_id\030\017 \001(\004\"\322\002\n\nServerL" + "oad\022\032\n\022number_of_requests\030\001 \001(\r\022 \n\030total" + "_number_of_requests\030\002 \001(\r\022\024\n\014used_heap_M" + "B\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022!\n\014region_l" + "oads\030\005 \003(\0132\013.RegionLoad\022\"\n\014coprocessors\030" + "\006 \003(\0132\014.Coprocessor\022\031\n\021report_start_time" + "\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004\022\030\n\020info_", - "server_port\030\t \001(\r\"O\n\016LiveServerInfo\022\033\n\006s" + - "erver\030\001 \002(\0132\013.ServerName\022 \n\013server_load\030" + - "\002 \002(\0132\013.ServerLoad\"\340\002\n\rClusterStatus\022/\n\r" + - "hbase_version\030\001 \001(\0132\030.HBaseVersionFileCo" + - "ntent\022%\n\014live_servers\030\002 \003(\0132\017.LiveServer" + - "Info\022!\n\014dead_servers\030\003 \003(\0132\013.ServerName\022" + - "2\n\025regions_in_transition\030\004 \003(\0132\023.RegionI" + - "nTransition\022\036\n\ncluster_id\030\005 \001(\0132\n.Cluste" + - "rId\022)\n\023master_coprocessors\030\006 \003(\0132\014.Copro" + - "cessor\022\033\n\006master\030\007 \001(\0132\013.ServerName\022#\n\016b", - "ackup_masters\030\010 \003(\0132\013.ServerName\022\023\n\013bala" + - "ncer_on\030\t \001(\010BF\n*org.apache.hadoop.hbase" + - ".protobuf.generatedB\023ClusterStatusProtos" + - "H\001\240\001\001" + "server_port\030\t \001(\r\022#\n\033replicationLoadSour" + + "ceString\030\n \001(\t\022!\n\031replicationLoadSinkStr" + + "ing\030\013 \001(\t\"O\n\016LiveServerInfo\022\033\n\006server\030\001 " + + "\002(\0132\013.ServerName\022 \n\013server_load\030\002 \002(\0132\013." + + "ServerLoad\"\340\002\n\rClusterStatus\022/\n\rhbase_ve" + + "rsion\030\001 \001(\0132\030.HBaseVersionFileContent\022%\n" + + "\014live_servers\030\002 \003(\0132\017.LiveServerInfo\022!\n\014" + + "dead_servers\030\003 \003(\0132\013.ServerName\0222\n\025regio" + + "ns_in_transition\030\004 \003(\0132\023.RegionInTransit" + + "ion\022\036\n\ncluster_id\030\005 \001(\0132\n.ClusterId\022)\n\023m", + "aster_coprocessors\030\006 \003(\0132\014.Coprocessor\022\033" + + "\n\006master\030\007 \001(\0132\013.ServerName\022#\n\016backup_ma" + + "sters\030\010 \003(\0132\013.ServerName\022\023\n\013balancer_on\030" + + "\t \001(\010BF\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\023ClusterStatusProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -10411,7 +10862,7 @@ public final class ClusterStatusProtos { internal_static_ServerLoad_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ServerLoad_descriptor, - new java.lang.String[] { "NumberOfRequests", "TotalNumberOfRequests", "UsedHeapMB", "MaxHeapMB", "RegionLoads", "Coprocessors", "ReportStartTime", "ReportEndTime", "InfoServerPort", }); + new java.lang.String[] { "NumberOfRequests", "TotalNumberOfRequests", "UsedHeapMB", "MaxHeapMB", "RegionLoads", "Coprocessors", "ReportStartTime", "ReportEndTime", "InfoServerPort", "ReplicationLoadSourceString", "ReplicationLoadSinkString", }); internal_static_LiveServerInfo_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_LiveServerInfo_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/ClusterStatus.proto hbase-protocol/src/main/protobuf/ClusterStatus.proto index dbf00dc..dcde761 100644 --- hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -155,6 +155,16 @@ message ServerLoad { * The port number that this region server is hosing an info server on. */ optional uint32 info_server_port = 9; + + /** + * The replicationLoadSourceString for the replication Source status of this region server . + */ + optional string replicationLoadSourceString = 10; + + /** + * The replicationLoadSinkString for the replication Sink status of this region server . + */ + optional string replicationLoadSinkString = 11; } message LiveServerInfo { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 84f986b..cfa1df3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -153,6 +153,9 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad.REPLICATIONLOADSOURCE; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad.REPLICATIONLOADSINK; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -963,6 +966,7 @@ public class HRegionServer extends HasThread implements } } + ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) { // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests // per second, and other metrics As long as metrics are part of ServerLoad it's best to use @@ -976,16 +980,14 @@ public class HRegionServer extends HasThread implements MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - ClusterStatusProtos.ServerLoad.Builder serverLoad = - ClusterStatusProtos.ServerLoad.newBuilder(); + ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount()); - serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024)); + serverLoad.setUsedHeapMB((int) (memory.getUsed() / 1024 / 1024)); serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024)); Set coprocessors = this.hlog.getCoprocessorHost().getCoprocessors(); for (String coprocessor : coprocessors) { - serverLoad.addCoprocessors( - Coprocessor.newBuilder().setName(coprocessor).build()); + serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build()); } RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); @@ -999,6 +1001,17 @@ public class HRegionServer extends HasThread implements } else { serverLoad.setInfoServerPort(-1); } + + // for the replicationLoad purpose. Only need to get from one service + // either source or sink will get the same info + ReplicationSourceService rsources = getReplicationSourceService(); + + if (rsources != null) { + HashMap rLoadMap = rsources.setAndGetReplicationLoadMap(); + serverLoad.setReplicationLoadSourceString(rLoadMap.get(REPLICATIONLOADSOURCE)); + serverLoad.setReplicationLoadSinkString(rLoadMap.get(REPLICATIONLOADSINK)); + } + return serverLoad.build(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index cef7b46..fef9324 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.HashMap; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.classification.InterfaceAudience; @@ -52,4 +53,9 @@ public interface ReplicationService { * Stops replication service. */ void stopReplicationService(); + + /** + * Set and Get ReplicationLoad Map + */ + public HashMap setAndGetReplicationLoadMap(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index 2a715c0..449b388 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -36,6 +36,8 @@ public class MetricsSink { private MetricsReplicationSource rms; private long lastTimestampForAge = System.currentTimeMillis(); + private long age; + public MetricsSink() { rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class); } @@ -48,7 +50,7 @@ public class MetricsSink { */ public long setAgeOfLastAppliedOp(long timestamp) { lastTimestampForAge = timestamp; - long age = System.currentTimeMillis() - lastTimestampForAge; + this.age = System.currentTimeMillis() - lastTimestampForAge; rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); return age; } @@ -72,4 +74,20 @@ public class MetricsSink { rms.incCounters(SINK_APPLIED_OPS, batchSize); } + /** + * Get the Age of Last Applied Op + * @return ageOfLastAppliedOp + */ + public long getAgeOfLastAppliedOp() { + return this.age; + } + + /** + * Get the TimeStampOfLastAppliedOp. If no replication Op applied yet, the value is the timestamp + * at which hbase instance starts + * @return timeStampsOfLastAppliedOp; + */ + public long getTimeStampOfLastAppliedOp() { + return this.lastTimestampForAge; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index b38a0c8..8687a08 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -45,6 +45,7 @@ public class MetricsSource { private long lastTimestamp = 0; private int lastQueueSize = 0; + private long age = 0; private String sizeOfLogQueKey; private String ageOfLastShippedOpKey; @@ -82,7 +83,7 @@ public class MetricsSource { * @param timestamp write time of the edit */ public void setAgeOfLastShippedOp(long timestamp) { - long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp; + this.age = EnvironmentEdgeManager.currentTimeMillis() - timestamp; rms.setGauge(ageOfLastShippedOpKey, age); rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age); this.lastTimestamp = timestamp; @@ -140,8 +141,39 @@ public class MetricsSource { } /** + * Get AgeOfLastShippedOp + * @return AgeOfLastShippedOp + */ + public Long getAgeOfLastShippedOp() { + return this.age; + } + + /** + * Get the sizeOfLogQueue + * @return sizeOfLogQueue + */ + public int getSizeOfLogQueue() { + return this.lastQueueSize; + } + + /** + * Get the timeStampsOfLastShippedOp + * @return lastTimestampForAge + */ + public long getTimeStampOfLastShippedOp() { + return lastTimestamp; + } + + /** + * Get the slave peer ID + * @return peerID + */ + public String getPeerID() { + return this.id; + } + + /** * Convience method to apply changes to metrics do to shipping a batch of logs. - * * @param batchSize the size of the batch that was shipped to sinks. */ public void shipBatch(long batchSize, int sizeInKB) { @@ -170,4 +202,5 @@ public class MetricsSource { rms.removeMetric(logEditsReadKey); } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index f06ddef..e1e5dd7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -79,6 +81,8 @@ public class Replication implements WALActionsListener, private ReplicationSink replicationSink; // Hosting server private Server server; + // ReplicationLoad to access replication metrics + private ReplicationLoad replicationLoad; /** Statistics thread schedule pool */ private ScheduledExecutorService scheduleThreadPool; private int statsThreadPeriod; @@ -135,6 +139,7 @@ public class Replication implements WALActionsListener, this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId); + this.replicationLoad = new ReplicationLoad(); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); @@ -143,6 +148,7 @@ public class Replication implements WALActionsListener, this.replicationQueues = null; this.replicationPeers = null; this.replicationTracker = null; + this.replicationLoad = null; } } @@ -309,6 +315,34 @@ public class Replication implements WALActionsListener, // not interested } + private void buildReplicationLoad() { + + // get source + List sources = this.replicationManager.getSources(); + List sourceMetricsList = new ArrayList(); + + for (ReplicationSourceInterface source : sources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + + // get sink + MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); + + this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); + + } + + @Override + public HashMap setAndGetReplicationLoadMap() { + if (this.replicationLoad == null) return null; + + // always built for latest data + buildReplicationLoad(); + + return this.replicationLoad.getReplicationLoadMap(); + } /* * Statistics thread. Periodically prints the cache statistics to the log. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java new file mode 100644 index 0000000..8762b6b --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -0,0 +1,135 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Date; +import java.util.List; +import java.util.HashMap; + +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Strings; + +/** + * This class is used exporting some of the info from replication metrics + */ +public class ReplicationLoad { + + public static final String REPLICATIONLOADSOURCE = "SOURCE"; + public static final String REPLICATIONLOADSINK = "SINK"; + + // Empty load instance. + public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); + + private List sourceMetricsList; + private MetricsSink sinkMetrics; + + /** default constructor */ + public ReplicationLoad() { + super(); + } + + /** + * buildReplicationLoad + * @param srMetricsList + * @param skMetrics + */ + + public void buildReplicationLoad(final List srMetricsList, + final MetricsSink skMetrics) { + this.sourceMetricsList = srMetricsList; + this.sinkMetrics = skMetrics; + } + + /** + * sourceToString + * @return a string contains sourceReplicationLoad information + */ + public String sourceToString() { + if (this.sourceMetricsList == null) return null; + + StringBuilder sb = new StringBuilder(); + for (MetricsSource sm : this.sourceMetricsList) { + long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); + int sizeOfLogQueue = sm.getSizeOfLogQueue(); + long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); + long replicationLag; + long timePassedAfterLastShippedOp + = EnvironmentEdgeManager.currentTimeMillis() - timeStampOfLastShippedOp; + if (sizeOfLogQueue != 0) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + + sb = Strings.appendKeyValue(sb, "PeerID", sm.getPeerID()); + sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", ageOfLastShippedOp); + sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", sizeOfLogQueue); + sb = + Strings.appendKeyValue(sb, "TimeStampsOfLastShippedOp", (new Date( + timeStampOfLastShippedOp).toString())); + sb = Strings.appendKeyValue(sb, "Replication Lag", replicationLag); + } + + return sb.toString(); + } + + /** + * sinkToString + * @return a string contains sinkReplicationLoad information + */ + public String sinkToString() { + if (this.sinkMetrics == null) return null; + + StringBuilder sb = new StringBuilder(); + sb = + Strings.appendKeyValue(sb, "AgeOfLastAppliedOp", this.sinkMetrics.getAgeOfLastAppliedOp()); + sb = + Strings.appendKeyValue(sb, "TimeStampsOfLastAppliedOp", + (new Date(this.sinkMetrics.getTimeStampOfLastAppliedOp()).toString())); + sb.append(System.getProperty("line.separator")); + + return sb.toString(); + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString(); + } + + public HashMap getReplicationLoadMap() { + HashMap bothLoadString = new HashMap(); + bothLoadString.put(REPLICATIONLOADSOURCE, this.sourceToString()); + bothLoadString.put(REPLICATIONLOADSINK, this.sinkToString()); + return bothLoadString; + } + + public String getReplicationLoadString() { + return this.sourceToString() + "\n" + this.sinkToString(); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 4a4de86..81f773d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -255,4 +255,12 @@ public class ReplicationSink { "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + this.totalReplicatedEdits; } + + /** + * Get replication Sink Metrics + * @return MetricsSink + */ + public MetricsSink getSinkMetrics() { + return this.metrics; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4e2106d..9ee9153 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -905,6 +905,14 @@ public class ReplicationSource extends Thread } } + /** + * Get Replication Source Metrics + * @return sourceMetrics + */ + public MetricsSource getSourceMetrics() { + return this.metrics; + } + @Override public String getStats() { long position = this.repLogReader.getPosition(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 5c0f710..89e0958 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -549,4 +551,40 @@ public class TestReplicationSmallTests extends TestReplicationBase { hadmin.close(); } + /** + * Test for HBASE-9531 + * put a few rows into htable1, which should be replicated to htable2 + * create a ClusterStatus instance 'status' from HBaseAdmin + * test : status.getLoad(server).getReplicationLoadSourceString() + * test : status.getLoad(server).getReplicationLoadSinkString() + * * @throws Exception + */ + @Test(timeout = 300000) + public void testReplicationStatus() throws Exception { + LOG.info("testReplicationStatus"); + + HBaseAdmin hadmin = new HBaseAdmin(conf1); + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + + ClusterStatus status = hadmin.getClusterStatus(); + + for (ServerName server : status.getServers()) { + String sourceString = status.getLoad(server).getReplicationLoadSourceString(); + String sinkString = status.getLoad(server).getReplicationLoadSinkString(); + assertTrue("failed to get ReplicationLoadSourceString: "+ sourceString, + (sourceString.contains("AgeOfLastShippedOp"))); + assertTrue("failed to get ReplicationLoadSinkString: " + sinkString, + (sinkString.contains("AgeOfLastAppliedOp"))); + } + hadmin.close(); + } + } diff --git hbase-shell/src/main/ruby/hbase/admin.rb hbase-shell/src/main/ruby/hbase/admin.rb index 43ccad3..402dd29 100644 --- hbase-shell/src/main/ruby/hbase/admin.rb +++ hbase-shell/src/main/ruby/hbase/admin.rb @@ -554,7 +554,7 @@ module Hbase end end - def status(format) + def status(format,type) status = @admin.getClusterStatus() if format == "detailed" puts("version %s" % [ status.getHBaseVersion() ]) @@ -581,6 +581,23 @@ module Hbase for server in status.getDeadServerNames() puts(" %s" % [ server ]) end + elsif format == "replication" + puts("version %s" % [ status.getHBaseVersion() ]) + puts("%d live servers" % [ status.getServersSize() ]) + rsource = org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad::REPLICATIONLOADSOURCE + rsink = org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad::REPLICATIONLOADSINK + for server in status.getServers() + puts(" %s:" % + [ server.getHostname() ]) + if type.casecmp(rsource) == 0 + puts(" SOURCE:%s" % [ status.getLoad(server).getReplicationLoadSourceString() ]) + elsif type.casecmp(rsink) == 0 + puts(" SINK :%s" % [ status.getLoad(server).getReplicationLoadSinkString() ]) + else + puts(" SOURCE:%s" % [ status.getLoad(server).getReplicationLoadSourceString() ]) + puts(" SINK :%s" % [ status.getLoad(server).getReplicationLoadSinkString() ]) + end + end elsif format == "simple" load = 0 regions = 0 diff --git hbase-shell/src/main/ruby/shell/commands/status.rb hbase-shell/src/main/ruby/shell/commands/status.rb index f72c13c..4654b4a 100644 --- hbase-shell/src/main/ruby/shell/commands/status.rb +++ hbase-shell/src/main/ruby/shell/commands/status.rb @@ -22,18 +22,22 @@ module Shell class Status < Command def help return <<-EOF -Show cluster status. Can be 'summary', 'simple', or 'detailed'. The +Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The default is 'summary'. Examples: hbase> status hbase> status 'simple' hbase> status 'summary' hbase> status 'detailed' + hbase> status 'replication' + hbase> status 'replication', 'source' + hbase> status 'replication', 'sink' + EOF end - def command(format = 'summary') - admin.status(format) + def command(format = 'summary',type = 'both') + admin.status(format,type) end end end