done) {
+ channel.callMethod(
+ getDescriptor().getMethods().get(14),
+ controller,
+ request,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse.getDefaultInstance(),
+ com.google.protobuf.RpcUtil.generalizeCallback(
+ done,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse.class,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse.getDefaultInstance()));
+ }
}
public static BlockingInterface newBlockingStub(
@@ -18388,6 +19597,11 @@ public final class AdminProtos {
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest request)
throws com.google.protobuf.ServiceException;
+
+ public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse updateFavoredNodes(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest request)
+ throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@@ -18564,6 +19778,18 @@ public final class AdminProtos {
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse.getDefaultInstance());
}
+
+ public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse updateFavoredNodes(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest request)
+ throws com.google.protobuf.ServiceException {
+ return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse) channel.callBlockingMethod(
+ getDescriptor().getMethods().get(14),
+ controller,
+ request,
+ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse.getDefaultInstance());
+ }
+
}
// @@protoc_insertion_point(class_scope:AdminService)
@@ -18655,6 +19881,16 @@ public final class AdminProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_CompactRegionResponse_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_UpdateFavoredNodesRequest_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_UpdateFavoredNodesRequest_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_UpdateFavoredNodesResponse_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_UpdateFavoredNodesResponse_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
internal_static_MergeRegionsRequest_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -18759,46 +19995,51 @@ public final class AdminProtos {
"split_point\030\002 \001(\014\"\025\n\023SplitRegionResponse" +
"\"W\n\024CompactRegionRequest\022 \n\006region\030\001 \002(\013" +
"2\020.RegionSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006fam" +
- "ily\030\003 \001(\014\"\027\n\025CompactRegionResponse\"v\n\023Me" +
- "rgeRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Re" +
- "gionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.Region" +
- "Specifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Me",
- "rgeRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 " +
- "\002(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n" +
- "\025associated_cell_count\030\003 \001(\005\"4\n\030Replicat" +
- "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" +
- "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" +
- "ALWriterRequest\"0\n\025RollWALWriterResponse" +
- "\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerR" +
- "equest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerResp" +
- "onse\"\026\n\024GetServerInfoRequest\"B\n\nServerIn" +
- "fo\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\n",
- "webui_port\030\002 \001(\r\"9\n\025GetServerInfoRespons" +
- "e\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\337\006\n\014" +
- "AdminService\022>\n\rGetRegionInfo\022\025.GetRegio" +
- "nInfoRequest\032\026.GetRegionInfoResponse\022;\n\014" +
- "GetStoreFile\022\024.GetStoreFileRequest\032\025.Get" +
- "StoreFileResponse\022D\n\017GetOnlineRegion\022\027.G" +
- "etOnlineRegionRequest\032\030.GetOnlineRegionR" +
- "esponse\0225\n\nOpenRegion\022\022.OpenRegionReques" +
- "t\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023." +
- "CloseRegionRequest\032\024.CloseRegionResponse",
- "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" +
- "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" +
- "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" +
- "CompactRegion\022\025.CompactRegionRequest\032\026.C" +
- "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" +
- "ergeRegionsRequest\032\025.MergeRegionsRespons" +
- "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" +
- "ryRequest\032\032.ReplicateWALEntryResponse\022\'\n" +
- "\006Replay\022\r.MultiRequest\032\016.MultiResponse\022>" +
- "\n\rRollWALWriter\022\025.RollWALWriterRequest\032\026",
- ".RollWALWriterResponse\022>\n\rGetServerInfo\022" +
- "\025.GetServerInfoRequest\032\026.GetServerInfoRe" +
- "sponse\0225\n\nStopServer\022\022.StopServerRequest" +
- "\032\023.StopServerResponseBA\n*org.apache.hado" +
- "op.hbase.protobuf.generatedB\013AdminProtos" +
- "H\001\210\001\001\240\001\001"
+ "ily\030\003 \001(\014\"\027\n\025CompactRegionResponse\"X\n\031Up" +
+ "dateFavoredNodesRequest\022;\n\020updateRegionI" +
+ "nfo\030\001 \003(\0132!.OpenRegionRequest.RegionOpen" +
+ "Info\".\n\032UpdateFavoredNodesResponse\022\020\n\010re",
+ "sponse\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"\n\010r" +
+ "egion_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010regio" +
+ "n_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcible\030" +
+ "\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"X\n\010" +
+ "WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_va" +
+ "lue_bytes\030\002 \003(\014\022\035\n\025associated_cell_count" +
+ "\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n\005en" +
+ "try\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALEntr" +
+ "yResponse\"\026\n\024RollWALWriterRequest\"0\n\025Rol" +
+ "lWALWriterResponse\022\027\n\017region_to_flush\030\001 ",
+ "\003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 \002(\t" +
+ "\"\024\n\022StopServerResponse\"\026\n\024GetServerInfoR" +
+ "equest\"B\n\nServerInfo\022 \n\013server_name\030\001 \002(" +
+ "\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025Ge" +
+ "tServerInfoResponse\022 \n\013server_info\030\001 \002(\013" +
+ "2\013.ServerInfo2\246\007\n\014AdminService\022>\n\rGetReg" +
+ "ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" +
+ "onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" +
+ "eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" +
+ "etOnlineRegion\022\027.GetOnlineRegionRequest\032",
+ "\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" +
+ "\022.OpenRegionRequest\032\023.OpenRegionResponse" +
+ "\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" +
+ "loseRegionResponse\0228\n\013FlushRegion\022\023.Flus" +
+ "hRegionRequest\032\024.FlushRegionResponse\0228\n\013" +
+ "SplitRegion\022\023.SplitRegionRequest\032\024.Split" +
+ "RegionResponse\022>\n\rCompactRegion\022\025.Compac" +
+ "tRegionRequest\032\026.CompactRegionResponse\022;" +
+ "\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" +
+ "ergeRegionsResponse\022J\n\021ReplicateWALEntry",
+ "\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" +
+ "ALEntryResponse\022\'\n\006Replay\022\r.MultiRequest" +
+ "\032\016.MultiResponse\022>\n\rRollWALWriter\022\025.Roll" +
+ "WALWriterRequest\032\026.RollWALWriterResponse" +
+ "\022>\n\rGetServerInfo\022\025.GetServerInfoRequest" +
+ "\032\026.GetServerInfoResponse\0225\n\nStopServer\022\022" +
+ ".StopServerRequest\032\023.StopServerResponse\022" +
+ "E\n\022UpdateFavoredNodes\022\022.OpenRegionReques" +
+ "t\032\033.UpdateFavoredNodesResponseBA\n*org.ap" +
+ "ache.hadoop.hbase.protobuf.generatedB\013Ad",
+ "minProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -18907,74 +20148,86 @@ public final class AdminProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompactRegionResponse_descriptor,
new java.lang.String[] { });
- internal_static_MergeRegionsRequest_descriptor =
+ internal_static_UpdateFavoredNodesRequest_descriptor =
getDescriptor().getMessageTypes().get(16);
+ internal_static_UpdateFavoredNodesRequest_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_UpdateFavoredNodesRequest_descriptor,
+ new java.lang.String[] { "UpdateRegionInfo", });
+ internal_static_UpdateFavoredNodesResponse_descriptor =
+ getDescriptor().getMessageTypes().get(17);
+ internal_static_UpdateFavoredNodesResponse_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_UpdateFavoredNodesResponse_descriptor,
+ new java.lang.String[] { "Response", });
+ internal_static_MergeRegionsRequest_descriptor =
+ getDescriptor().getMessageTypes().get(18);
internal_static_MergeRegionsRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MergeRegionsRequest_descriptor,
new java.lang.String[] { "RegionA", "RegionB", "Forcible", });
internal_static_MergeRegionsResponse_descriptor =
- getDescriptor().getMessageTypes().get(17);
+ getDescriptor().getMessageTypes().get(19);
internal_static_MergeRegionsResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MergeRegionsResponse_descriptor,
new java.lang.String[] { });
internal_static_WALEntry_descriptor =
- getDescriptor().getMessageTypes().get(18);
+ getDescriptor().getMessageTypes().get(20);
internal_static_WALEntry_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALEntry_descriptor,
new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", });
internal_static_ReplicateWALEntryRequest_descriptor =
- getDescriptor().getMessageTypes().get(19);
+ getDescriptor().getMessageTypes().get(21);
internal_static_ReplicateWALEntryRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ReplicateWALEntryRequest_descriptor,
new java.lang.String[] { "Entry", });
internal_static_ReplicateWALEntryResponse_descriptor =
- getDescriptor().getMessageTypes().get(20);
+ getDescriptor().getMessageTypes().get(22);
internal_static_ReplicateWALEntryResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ReplicateWALEntryResponse_descriptor,
new java.lang.String[] { });
internal_static_RollWALWriterRequest_descriptor =
- getDescriptor().getMessageTypes().get(21);
+ getDescriptor().getMessageTypes().get(23);
internal_static_RollWALWriterRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RollWALWriterRequest_descriptor,
new java.lang.String[] { });
internal_static_RollWALWriterResponse_descriptor =
- getDescriptor().getMessageTypes().get(22);
+ getDescriptor().getMessageTypes().get(24);
internal_static_RollWALWriterResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RollWALWriterResponse_descriptor,
new java.lang.String[] { "RegionToFlush", });
internal_static_StopServerRequest_descriptor =
- getDescriptor().getMessageTypes().get(23);
+ getDescriptor().getMessageTypes().get(25);
internal_static_StopServerRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_StopServerRequest_descriptor,
new java.lang.String[] { "Reason", });
internal_static_StopServerResponse_descriptor =
- getDescriptor().getMessageTypes().get(24);
+ getDescriptor().getMessageTypes().get(26);
internal_static_StopServerResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_StopServerResponse_descriptor,
new java.lang.String[] { });
internal_static_GetServerInfoRequest_descriptor =
- getDescriptor().getMessageTypes().get(25);
+ getDescriptor().getMessageTypes().get(27);
internal_static_GetServerInfoRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_GetServerInfoRequest_descriptor,
new java.lang.String[] { });
internal_static_ServerInfo_descriptor =
- getDescriptor().getMessageTypes().get(26);
+ getDescriptor().getMessageTypes().get(28);
internal_static_ServerInfo_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ServerInfo_descriptor,
new java.lang.String[] { "ServerName", "WebuiPort", });
internal_static_GetServerInfoResponse_descriptor =
- getDescriptor().getMessageTypes().get(27);
+ getDescriptor().getMessageTypes().get(29);
internal_static_GetServerInfoResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_GetServerInfoResponse_descriptor,
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index 7311314..517de77 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -145,6 +145,14 @@ message CompactRegionRequest {
message CompactRegionResponse {
}
+message UpdateFavoredNodesRequest {
+ repeated OpenRegionRequest.RegionOpenInfo updateRegionInfo = 1;
+}
+
+message UpdateFavoredNodesResponse {
+ optional uint32 response = 1;
+}
+
/**
* Merges the specified regions.
*
@@ -251,4 +259,7 @@ service AdminService {
rpc StopServer(StopServerRequest)
returns(StopServerResponse);
+
+ rpc UpdateFavoredNodes(OpenRegionRequest)
+ returns(UpdateFavoredNodesResponse);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 2df6185..6b3f054 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@@ -126,6 +127,8 @@ public class AssignmentManager extends ZooKeeperListener {
private final TableLockManager tableLockManager;
+ private volatile int numRegionsOpened = 0;
+
final private KeyLocker locker = new KeyLocker();
/**
@@ -1366,7 +1369,7 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.warn("A region was opened on a dead server, ServerName=" +
sn + ", region=" + regionInfo.getEncodedName());
}
-
+ numRegionsOpened++;
regionStates.regionOnline(regionInfo, sn);
// Remove plan if one.
@@ -2412,6 +2415,15 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
+ * Used by unit tests. Return the number of regions opened so far in the life
+ * of the master. Increases by one every time the master opens a region
+ * @return the counter value of the number of regions opened so far
+ */
+ public int getNumRegionsOpened() {
+ return numRegionsOpened;
+ }
+
+ /**
* Waits until the specified region has completed assignment.
*
* If the region is already assigned, returns immediately. Otherwise, method
@@ -2558,14 +2570,10 @@ public class AssignmentManager extends ZooKeeperListener {
disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
// Scan META for all user regions, skipping any disabled tables
Map allRegions;
- if (this.shouldAssignRegionsWithFavoredNodes) {
- allRegions = FavoredNodeAssignmentHelper.fullScan(
- catalogTracker, disabledOrDisablingOrEnabling, true, (FavoredNodeLoadBalancer)balancer);
- } else {
- allRegions = MetaReader.fullScan(
- catalogTracker, disabledOrDisablingOrEnabling, true);
- }
-
+ SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
+ new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
+ snapshotOfRegionAssignment.initialize();
+ allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
if (allRegions == null) return;
//remove system tables because they would have been assigned earlier
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java
new file mode 100644
index 0000000..b84014a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java
@@ -0,0 +1,598 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
+/**
+ * Helper class that is used by {@link RegionPlacementMaintainer} to print
+ * information for favored nodes
+ *
+ */
+@InterfaceAudience.Private
+public class AssignmentVerificationReport {
+ protected static final Log LOG = LogFactory.getLog(
+ AssignmentVerificationReport.class.getName());
+
+ private TableName tableName = null;
+ private boolean enforceLocality = false;
+ private boolean isFilledUp = false;
+
+ private int totalRegions = 0;
+ private int totalRegionServers = 0;
+ // for unassigned regions
+ private List unAssignedRegionsList =
+ new ArrayList();
+
+ // For regions without valid favored nodes
+ private List regionsWithoutValidFavoredNodes =
+ new ArrayList();
+
+ // For regions not running on the favored nodes
+ private List nonFavoredAssignedRegionList =
+ new ArrayList();
+
+ // For regions running on the favored nodes
+ private int totalFavoredAssignments = 0;
+ private int[] favoredNodes = new int[FavoredNodeAssignmentHelper.FAVORED_NODES_NUM];
+ private float[] favoredNodesLocalitySummary =
+ new float[FavoredNodeAssignmentHelper.FAVORED_NODES_NUM];
+ private float actualLocalitySummary = 0;
+
+ // For region balancing information
+ private float avgRegionsOnRS = 0;
+ private int maxRegionsOnRS = 0;
+ private int minRegionsOnRS = Integer.MAX_VALUE;
+ private Set mostLoadedRSSet =
+ new HashSet();
+ private Set leastLoadedRSSet =
+ new HashSet();
+
+ private float avgDispersionScore = 0;
+ private float maxDispersionScore = 0;
+ private Set maxDispersionScoreServerSet =
+ new HashSet();
+ private float minDispersionScore = Float.MAX_VALUE;
+ private Set minDispersionScoreServerSet =
+ new HashSet();
+
+ private float avgDispersionNum = 0;
+ private float maxDispersionNum = 0;
+ private Set maxDispersionNumServerSet =
+ new HashSet();
+ private float minDispersionNum = Float.MAX_VALUE;
+ private Set minDispersionNumServerSet =
+ new HashSet();
+
+ public void fillUp(TableName tableName, SnapshotOfRegionAssignmentFromMeta snapshot,
+ Map> regionLocalityMap) {
+ // Set the table name
+ this.tableName = tableName;
+
+ // Get all the regions for this table
+ List regionInfoList =
+ snapshot.getTableToRegionMap().get(tableName);
+ // Get the total region num for the current table
+ this.totalRegions = regionInfoList.size();
+
+ // Get the existing assignment plan
+ FavoredNodesPlan favoredNodesAssignment = snapshot.getExistingAssignmentPlan();
+ // Get the region to region server mapping
+ Map currentAssignment =
+ snapshot.getRegionToRegionServerMap();
+ // Initialize the server to its hosing region counter map
+ Map serverToHostingRegionCounterMap =
+ new HashMap();
+
+ Map primaryRSToRegionCounterMap =
+ new HashMap();
+ Map> primaryToSecTerRSMap =
+ new HashMap>();
+
+ // Check the favored nodes and its locality information
+ // Also keep tracker of the most loaded and least loaded region servers
+ for (HRegionInfo region : regionInfoList) {
+ try {
+ ServerName currentRS = currentAssignment.get(region);
+ // Handle unassigned regions
+ if (currentRS == null) {
+ unAssignedRegionsList.add(region);
+ continue;
+ }
+
+ // Keep updating the server to is hosting region counter map
+ Integer hostRegionCounter = serverToHostingRegionCounterMap.get(currentRS);
+ if (hostRegionCounter == null) {
+ hostRegionCounter = Integer.valueOf(0);
+ }
+ hostRegionCounter = hostRegionCounter.intValue() + 1;
+ serverToHostingRegionCounterMap.put(currentRS, hostRegionCounter);
+
+ // Get the favored nodes from the assignment plan and verify it.
+ List favoredNodes = favoredNodesAssignment.getFavoredNodes(region);
+ if (favoredNodes == null ||
+ favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
+ regionsWithoutValidFavoredNodes.add(region);
+ continue;
+ }
+ // Get the primary, secondary and tertiary region server
+ ServerName primaryRS =
+ favoredNodes.get(FavoredNodesPlan.Position.PRIMARY.ordinal());
+ ServerName secondaryRS =
+ favoredNodes.get(FavoredNodesPlan.Position.SECONDARY.ordinal());
+ ServerName tertiaryRS =
+ favoredNodes.get(FavoredNodesPlan.Position.TERTIARY.ordinal());
+
+ // Update the primary rs to its region set map
+ Integer regionCounter = primaryRSToRegionCounterMap.get(primaryRS);
+ if (regionCounter == null) {
+ regionCounter = Integer.valueOf(0);
+ }
+ regionCounter = regionCounter.intValue() + 1;
+ primaryRSToRegionCounterMap.put(primaryRS, regionCounter);
+
+ // Update the primary rs to secondary and tertiary rs map
+ Set secAndTerSet = primaryToSecTerRSMap.get(primaryRS);
+ if (secAndTerSet == null) {
+ secAndTerSet = new HashSet();
+ }
+ secAndTerSet.add(secondaryRS);
+ secAndTerSet.add(tertiaryRS);
+ primaryToSecTerRSMap.put(primaryRS, secAndTerSet);
+
+ // Get the position of the current region server in the favored nodes list
+ FavoredNodesPlan.Position favoredNodePosition =
+ FavoredNodesPlan.getFavoredServerPosition(favoredNodes, currentRS);
+
+ // Handle the non favored assignment.
+ if (favoredNodePosition == null) {
+ nonFavoredAssignedRegionList.add(region);
+ continue;
+ }
+ // Increase the favored nodes assignment.
+ this.favoredNodes[favoredNodePosition.ordinal()]++;
+ totalFavoredAssignments++;
+
+ // Summary the locality information for each favored nodes
+ if (regionLocalityMap != null) {
+ // Set the enforce locality as true;
+ this.enforceLocality = true;
+
+ // Get the region degree locality map
+ Map regionDegreeLocalityMap =
+ regionLocalityMap.get(region.getEncodedName());
+ if (regionDegreeLocalityMap == null) {
+ continue; // ignore the region which doesn't have any store files.
+ }
+
+ // Get the locality summary for each favored nodes
+ for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
+ ServerName favoredNode = favoredNodes.get(p.ordinal());
+ // Get the locality for the current favored nodes
+ Float locality =
+ regionDegreeLocalityMap.get(favoredNode.getHostname());
+ if (locality != null) {
+ this.favoredNodesLocalitySummary[p.ordinal()] += locality;
+ }
+ }
+
+ // Get the locality summary for the current region server
+ Float actualLocality =
+ regionDegreeLocalityMap.get(currentRS.getHostname());
+ if (actualLocality != null) {
+ this.actualLocalitySummary += actualLocality;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot verify the region assignment for region " +
+ ((region == null) ? " null " : region.getRegionNameAsString()) +
+ "because of " + e);
+ }
+ }
+
+ float dispersionScoreSummary = 0;
+ float dispersionNumSummary = 0;
+ // Calculate the secondary score for each primary region server
+ for (Map.Entry entry :
+ primaryRSToRegionCounterMap.entrySet()) {
+ ServerName primaryRS = entry.getKey();
+ Integer regionsOnPrimary = entry.getValue();
+
+ // Process the dispersion number and score
+ float dispersionScore = 0;
+ int dispersionNum = 0;
+ if (primaryToSecTerRSMap.get(primaryRS) != null
+ && regionsOnPrimary.intValue() != 0) {
+ dispersionNum = primaryToSecTerRSMap.get(primaryRS).size();
+ dispersionScore = dispersionNum /
+ ((float) regionsOnPrimary.intValue() * 2);
+ }
+ // Update the max dispersion score
+ if (dispersionScore > this.maxDispersionScore) {
+ this.maxDispersionScoreServerSet.clear();
+ this.maxDispersionScoreServerSet.add(primaryRS);
+ this.maxDispersionScore = dispersionScore;
+ } else if (dispersionScore == this.maxDispersionScore) {
+ this.maxDispersionScoreServerSet.add(primaryRS);
+ }
+
+ // Update the max dispersion num
+ if (dispersionNum > this.maxDispersionNum) {
+ this.maxDispersionNumServerSet.clear();
+ this.maxDispersionNumServerSet.add(primaryRS);
+ this.maxDispersionNum = dispersionNum;
+ } else if (dispersionNum == this.maxDispersionNum) {
+ this.maxDispersionNumServerSet.add(primaryRS);
+ }
+
+ // Update the min dispersion score
+ if (dispersionScore < this.minDispersionScore) {
+ this.minDispersionScoreServerSet.clear();
+ this.minDispersionScoreServerSet.add(primaryRS);
+ this.minDispersionScore = dispersionScore;
+ } else if (dispersionScore == this.minDispersionScore) {
+ this.minDispersionScoreServerSet.add(primaryRS);
+ }
+
+ // Update the min dispersion num
+ if (dispersionNum < this.minDispersionNum) {
+ this.minDispersionNumServerSet.clear();
+ this.minDispersionNumServerSet.add(primaryRS);
+ this.minDispersionNum = dispersionNum;
+ } else if (dispersionNum == this.minDispersionNum) {
+ this.minDispersionNumServerSet.add(primaryRS);
+ }
+
+ dispersionScoreSummary += dispersionScore;
+ dispersionNumSummary += dispersionNum;
+ }
+
+ // Update the avg dispersion score
+ if (primaryRSToRegionCounterMap.keySet().size() != 0) {
+ this.avgDispersionScore = dispersionScoreSummary /
+ (float) primaryRSToRegionCounterMap.keySet().size();
+ this.avgDispersionNum = dispersionNumSummary /
+ (float) primaryRSToRegionCounterMap.keySet().size();
+ }
+
+ // Fill up the most loaded and least loaded region server information
+ for (Map.Entry entry :
+ serverToHostingRegionCounterMap.entrySet()) {
+ ServerName currentRS = entry.getKey();
+ int hostRegionCounter = entry.getValue().intValue();
+
+ // Update the most loaded region server list and maxRegionsOnRS
+ if (hostRegionCounter > this.maxRegionsOnRS) {
+ maxRegionsOnRS = hostRegionCounter;
+ this.mostLoadedRSSet.clear();
+ this.mostLoadedRSSet.add(currentRS);
+ } else if (hostRegionCounter == this.maxRegionsOnRS) {
+ this.mostLoadedRSSet.add(currentRS);
+ }
+
+ // Update the least loaded region server list and minRegionsOnRS
+ if (hostRegionCounter < this.minRegionsOnRS) {
+ this.minRegionsOnRS = hostRegionCounter;
+ this.leastLoadedRSSet.clear();
+ this.leastLoadedRSSet.add(currentRS);
+ } else if (hostRegionCounter == this.minRegionsOnRS) {
+ this.leastLoadedRSSet.add(currentRS);
+ }
+ }
+
+ // and total region servers
+ this.totalRegionServers = serverToHostingRegionCounterMap.keySet().size();
+ this.avgRegionsOnRS = (totalRegionServers == 0) ? 0 :
+ (totalRegions / (float) totalRegionServers);
+ // Set the isFilledUp as true
+ isFilledUp = true;
+ }
+
+ /**
+ * Use this to project the dispersion scores
+ * @param tableName
+ * @param snapshot
+ * @param newPlan
+ */
+ public void fillUpDispersion(TableName tableName,
+ SnapshotOfRegionAssignmentFromMeta snapshot, FavoredNodesPlan newPlan) {
+ // Set the table name
+ this.tableName = tableName;
+ // Get all the regions for this table
+ List regionInfoList = snapshot.getTableToRegionMap().get(
+ tableName);
+ // Get the total region num for the current table
+ this.totalRegions = regionInfoList.size();
+ FavoredNodesPlan plan = null;
+ if (newPlan == null) {
+ plan = snapshot.getExistingAssignmentPlan();
+ } else {
+ plan = newPlan;
+ }
+ // Get the region to region server mapping
+ Map primaryRSToRegionCounterMap =
+ new HashMap();
+ Map> primaryToSecTerRSMap =
+ new HashMap>();
+
+ // Check the favored nodes and its locality information
+ // Also keep tracker of the most loaded and least loaded region servers
+ for (HRegionInfo region : regionInfoList) {
+ try {
+ // Get the favored nodes from the assignment plan and verify it.
+ List favoredNodes = plan.getFavoredNodes(region);
+ if (favoredNodes == null
+ || favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
+ regionsWithoutValidFavoredNodes.add(region);
+ continue;
+ }
+ // Get the primary, secondary and tertiary region server
+ ServerName primaryRS = favoredNodes
+ .get(FavoredNodesPlan.Position.PRIMARY.ordinal());
+ ServerName secondaryRS = favoredNodes
+ .get(FavoredNodesPlan.Position.SECONDARY.ordinal());
+ ServerName tertiaryRS = favoredNodes
+ .get(FavoredNodesPlan.Position.TERTIARY.ordinal());
+
+ // Update the primary rs to its region set map
+ Integer regionCounter = primaryRSToRegionCounterMap.get(primaryRS);
+ if (regionCounter == null) {
+ regionCounter = Integer.valueOf(0);
+ }
+ regionCounter = regionCounter.intValue() + 1;
+ primaryRSToRegionCounterMap.put(primaryRS, regionCounter);
+
+ // Update the primary rs to secondary and tertiary rs map
+ Set secAndTerSet = primaryToSecTerRSMap.get(primaryRS);
+ if (secAndTerSet == null) {
+ secAndTerSet = new HashSet();
+ }
+ secAndTerSet.add(secondaryRS);
+ secAndTerSet.add(tertiaryRS);
+ primaryToSecTerRSMap.put(primaryRS, secAndTerSet);
+ } catch (Exception e) {
+ LOG.error("Cannot verify the region assignment for region "
+ + ((region == null) ? " null " : region.getRegionNameAsString())
+ + "because of " + e);
+ }
+ }
+ float dispersionScoreSummary = 0;
+ float dispersionNumSummary = 0;
+ // Calculate the secondary score for each primary region server
+ for (Map.Entry entry :
+ primaryRSToRegionCounterMap.entrySet()) {
+ ServerName primaryRS = entry.getKey();
+ Integer regionsOnPrimary = entry.getValue();
+
+ // Process the dispersion number and score
+ float dispersionScore = 0;
+ int dispersionNum = 0;
+ if (primaryToSecTerRSMap.get(primaryRS) != null
+ && regionsOnPrimary.intValue() != 0) {
+ dispersionNum = primaryToSecTerRSMap.get(primaryRS).size();
+ dispersionScore = dispersionNum /
+ ((float) regionsOnPrimary.intValue() * 2);
+ }
+
+ // Update the max dispersion num
+ if (dispersionNum > this.maxDispersionNum) {
+ this.maxDispersionNumServerSet.clear();
+ this.maxDispersionNumServerSet.add(primaryRS);
+ this.maxDispersionNum = dispersionNum;
+ } else if (dispersionNum == this.maxDispersionNum) {
+ this.maxDispersionNumServerSet.add(primaryRS);
+ }
+
+ // Update the min dispersion score
+ if (dispersionScore < this.minDispersionScore) {
+ this.minDispersionScoreServerSet.clear();
+ this.minDispersionScoreServerSet.add(primaryRS);
+ this.minDispersionScore = dispersionScore;
+ } else if (dispersionScore == this.minDispersionScore) {
+ this.minDispersionScoreServerSet.add(primaryRS);
+ }
+
+ // Update the min dispersion num
+ if (dispersionNum < this.minDispersionNum) {
+ this.minDispersionNumServerSet.clear();
+ this.minDispersionNumServerSet.add(primaryRS);
+ this.minDispersionNum = dispersionNum;
+ } else if (dispersionNum == this.minDispersionNum) {
+ this.minDispersionNumServerSet.add(primaryRS);
+ }
+
+ dispersionScoreSummary += dispersionScore;
+ dispersionNumSummary += dispersionNum;
+ }
+
+ // Update the avg dispersion score
+ if (primaryRSToRegionCounterMap.keySet().size() != 0) {
+ this.avgDispersionScore = dispersionScoreSummary /
+ (float) primaryRSToRegionCounterMap.keySet().size();
+ this.avgDispersionNum = dispersionNumSummary /
+ (float) primaryRSToRegionCounterMap.keySet().size();
+ }
+ }
+
+ /**
+ * @return list which contains just 3 elements: average dispersion score, max
+ * dispersion score and min dispersion score as first, second and third element
+ * respectively.
+ *
+ */
+ public List getDispersionInformation() {
+ List dispersion = new ArrayList();
+ dispersion.add(avgDispersionScore);
+ dispersion.add(maxDispersionScore);
+ dispersion.add(minDispersionScore);
+ return dispersion;
+ }
+
+ public void print(boolean isDetailMode) {
+ if (!isFilledUp) {
+ System.err.println("[Error] Region assignment verfication report" +
+ "hasn't been filled up");
+ }
+ DecimalFormat df = new java.text.DecimalFormat( "#.##");
+
+ // Print some basic information
+ System.out.println("Region Assignment Verification for Table: " + tableName +
+ "\n\tTotal regions : " + totalRegions);
+
+ // Print the number of regions on each kinds of the favored nodes
+ System.out.println("\tTotal regions on favored nodes " +
+ totalFavoredAssignments);
+ for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
+ System.out.println("\t\tTotal regions on "+ p.toString() +
+ " region servers: " + favoredNodes[p.ordinal()]);
+ }
+ // Print the number of regions in each kinds of invalid assignment
+ System.out.println("\tTotal unassigned regions: " +
+ unAssignedRegionsList.size());
+ if (isDetailMode) {
+ for (HRegionInfo region : unAssignedRegionsList) {
+ System.out.println("\t\t" + region.getRegionNameAsString());
+ }
+ }
+
+ System.out.println("\tTotal regions NOT on favored nodes: " +
+ nonFavoredAssignedRegionList.size());
+ if (isDetailMode) {
+ for (HRegionInfo region : nonFavoredAssignedRegionList) {
+ System.out.println("\t\t" + region.getRegionNameAsString());
+ }
+ }
+
+ System.out.println("\tTotal regions without favored nodes: " +
+ regionsWithoutValidFavoredNodes.size());
+ if (isDetailMode) {
+ for (HRegionInfo region : regionsWithoutValidFavoredNodes) {
+ System.out.println("\t\t" + region.getRegionNameAsString());
+ }
+ }
+
+ // Print the locality information if enabled
+ if (this.enforceLocality && totalRegions != 0) {
+ // Print the actual locality for this table
+ float actualLocality = 100 *
+ this.actualLocalitySummary / (float) totalRegions;
+ System.out.println("\n\tThe actual avg locality is " +
+ df.format(actualLocality) + " %");
+
+ // Print the expected locality if regions are placed on the each kinds of
+ // favored nodes
+ for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
+ float avgLocality = 100 *
+ (favoredNodesLocalitySummary[p.ordinal()] / (float) totalRegions);
+ System.out.println("\t\tThe expected avg locality if all regions" +
+ " on the " + p.toString() + " region servers: "
+ + df.format(avgLocality) + " %");
+ }
+ }
+
+ // Print the region balancing information
+ System.out.println("\n\tTotal hosting region servers: " +
+ totalRegionServers);
+ // Print the region balance information
+ if (totalRegionServers != 0) {
+ System.out.println(
+ "\tAvg dispersion num: " +df.format(avgDispersionNum) +
+ " hosts;\tMax dispersion num: " + df.format(maxDispersionNum) +
+ " hosts;\tMin dispersion num: " + df.format(minDispersionNum) +
+ " hosts;");
+
+ System.out.println("\t\tThe number of the region servers with the max" +
+ " dispersion num: " + this.maxDispersionNumServerSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(maxDispersionNumServerSet);
+ }
+
+ System.out.println("\t\tThe number of the region servers with the min" +
+ " dispersion num: " + this.minDispersionNumServerSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(maxDispersionNumServerSet);
+ }
+
+ System.out.println(
+ "\tAvg dispersion score: " + df.format(avgDispersionScore) +
+ ";\tMax dispersion score: " + df.format(maxDispersionScore) +
+ ";\tMin dispersion score: " + df.format(minDispersionScore) + ";");
+
+ System.out.println("\t\tThe number of the region servers with the max" +
+ " dispersion score: " + this.maxDispersionScoreServerSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(maxDispersionScoreServerSet);
+ }
+
+ System.out.println("\t\tThe number of the region servers with the min" +
+ " dispersion score: " + this.minDispersionScoreServerSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(minDispersionScoreServerSet);
+ }
+
+ System.out.println(
+ "\tAvg regions/region server: " + df.format(avgRegionsOnRS) +
+ ";\tMax regions/region server: " + maxRegionsOnRS +
+ ";\tMin regions/region server: " + minRegionsOnRS + ";");
+
+ // Print the details about the most loaded region servers
+ System.out.println("\t\tThe number of the most loaded region servers: "
+ + mostLoadedRSSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(mostLoadedRSSet);
+ }
+
+ // Print the details about the least loaded region servers
+ System.out.println("\t\tThe number of the least loaded region servers: "
+ + leastLoadedRSSet.size());
+ if (isDetailMode) {
+ printHServerAddressSet(leastLoadedRSSet);
+ }
+ }
+ System.out.println("==============================");
+ }
+
+ private void printHServerAddressSet(Set serverSet) {
+ if (serverSet == null) {
+ return ;
+ }
+ int i = 0;
+ for (ServerName addr : serverSet){
+ if ((i++) % 3 == 0) {
+ System.out.print("\n\t\t\t");
+ }
+ System.out.print(addr.getHostAndPort() + " ; ");
+ }
+ System.out.println("\n");
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
new file mode 100644
index 0000000..9a714ab
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
@@ -0,0 +1,1113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MunkresAssignment;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * A tool that is used for manipulating and viewing favored nodes information
+ * for regions. Run with -h to get a list of the options
+ *
+ */
+@InterfaceAudience.Private
+public class RegionPlacementMaintainer {
+ private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class
+ .getName());
+ //The cost of a placement that should never be assigned.
+ private static final float MAX_COST = Float.POSITIVE_INFINITY;
+
+ // The cost of a placement that is undesirable but acceptable.
+ private static final float AVOID_COST = 100000f;
+
+ // The amount by which the cost of a placement is increased if it is the
+ // last slot of the server. This is done to more evenly distribute the slop
+ // amongst servers.
+ private static final float LAST_SLOT_COST_PENALTY = 0.5f;
+
+ // The amount by which the cost of a primary placement is penalized if it is
+ // not the host currently serving the region. This is done to minimize moves.
+ private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
+
+ private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
+
+ private Configuration conf;
+ private final boolean enforceLocality;
+ private final boolean enforceMinAssignmentMove;
+ private HBaseAdmin admin;
+ private RackManager rackManager;
+ private Set targetTableSet;
+
+ public RegionPlacementMaintainer(Configuration conf) {
+ this(conf, true, true);
+ }
+
+ public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
+ boolean enforceMinAssignmentMove) {
+ this.conf = conf;
+ this.enforceLocality = enforceLocality;
+ this.enforceMinAssignmentMove = enforceMinAssignmentMove;
+ this.targetTableSet = new HashSet();
+ this.rackManager = new RackManager(conf);
+ }
+ private static void printHelp(Options opt) {
+ new HelpFormatter().printHelp(
+ "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
+ "-diff>" +
+ " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
+ " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
+ }
+
+ public void setTargetTableName(String[] tableNames) {
+ if (tableNames != null) {
+ for (String table : tableNames)
+ this.targetTableSet.add(TableName.valueOf(table));
+ }
+ }
+
+ /**
+ * @return the cached HBaseAdmin
+ * @throws IOException
+ */
+ private HBaseAdmin getHBaseAdmin() throws IOException {
+ if (this.admin == null) {
+ this.admin = new HBaseAdmin(this.conf);
+ }
+ return this.admin;
+ }
+
+ /**
+ * @return the new RegionAssignmentSnapshot
+ * @throws IOException
+ */
+ public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
+ throws IOException {
+ SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
+ new SnapshotOfRegionAssignmentFromMeta(new CatalogTracker(this.conf));
+ currentAssignmentShapshot.initialize();
+ return currentAssignmentShapshot;
+ }
+
+ /**
+ * Verify the region placement is consistent with the assignment plan;
+ * @throws IOException
+ */
+ public void verifyRegionPlacement(boolean isDetailMode) throws IOException {
+ System.out.println("Start to verify the region assignment and " +
+ "generate the verification report");
+ // Get the region assignment snapshot
+ SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
+
+ // Get all the tables
+ Set tables = snapshot.getTableSet();
+
+ // Get the region locality map
+ Map> regionLocalityMap = null;
+ if (this.enforceLocality == true) {
+ regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+ }
+ // Iterate all the tables to fill up the verification report
+ for (TableName table : tables) {
+ if (!this.targetTableSet.isEmpty() &&
+ !this.targetTableSet.contains(table)) {
+ continue;
+ }
+ AssignmentVerificationReport report = new AssignmentVerificationReport();
+ report.fillUp(table, snapshot, regionLocalityMap);
+ report.print(isDetailMode);
+ }
+ }
+
+ /**
+ * Generate the assignment plan for the existing table
+ *
+ * @param tableName
+ * @param assignmentSnapshot
+ * @param regionLocalityMap
+ * @param plan
+ * @param munkresForSecondaryAndTertiary if set on true the assignment plan
+ * for the tertiary and secondary will be generated with Munkres algorithm,
+ * otherwise will be generated using placeSecondaryAndTertiaryRS
+ * @throws IOException
+ */
+ private void genAssignmentPlan(TableName tableName,
+ SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
+ Map> regionLocalityMap, FavoredNodesPlan plan,
+ boolean munkresForSecondaryAndTertiary) throws IOException {
+ // Get the all the regions for the current table
+ List regions =
+ assignmentSnapshot.getTableToRegionMap().get(tableName);
+ int numRegions = regions.size();
+
+ // Get the current assignment map
+ Map currentAssignmentMap =
+ assignmentSnapshot.getRegionToRegionServerMap();
+
+ // Get the all the region servers
+ List servers = new ArrayList();
+ servers.addAll(getHBaseAdmin().getClusterStatus().getServers());
+
+ LOG.info("Start to generate assignment plan for " + numRegions +
+ " regions from table " + tableName + " with " +
+ servers.size() + " region servers");
+
+ int slotsPerServer = (int) Math.ceil((float) numRegions /
+ servers.size());
+ int regionSlots = slotsPerServer * servers.size();
+
+ // Compute the primary, secondary and tertiary costs for each region/server
+ // pair. These costs are based only on node locality and rack locality, and
+ // will be modified later.
+ float[][] primaryCost = new float[numRegions][regionSlots];
+ float[][] secondaryCost = new float[numRegions][regionSlots];
+ float[][] tertiaryCost = new float[numRegions][regionSlots];
+
+ if (this.enforceLocality && regionLocalityMap != null) {
+ // Transform the locality mapping into a 2D array, assuming that any
+ // unspecified locality value is 0.
+ float[][] localityPerServer = new float[numRegions][regionSlots];
+ for (int i = 0; i < numRegions; i++) {
+ Map serverLocalityMap =
+ regionLocalityMap.get(regions.get(i).getEncodedName());
+ if (serverLocalityMap == null) {
+ continue;
+ }
+ for (int j = 0; j < servers.size(); j++) {
+ String serverName = servers.get(j).getHostname();
+ if (serverName == null) {
+ continue;
+ }
+ Float locality = serverLocalityMap.get(serverName);
+ if (locality == null) {
+ continue;
+ }
+ for (int k = 0; k < slotsPerServer; k++) {
+ // If we can't find the locality of a region to a server, which occurs
+ // because locality is only reported for servers which have some
+ // blocks of a region local, then the locality for that pair is 0.
+ localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
+ }
+ }
+ }
+
+ // Compute the total rack locality for each region in each rack. The total
+ // rack locality is the sum of the localities of a region on all servers in
+ // a rack.
+ Map> rackRegionLocality =
+ new HashMap>();
+ for (int i = 0; i < numRegions; i++) {
+ HRegionInfo region = regions.get(i);
+ for (int j = 0; j < regionSlots; j += slotsPerServer) {
+ String rack = rackManager.getRack(servers.get(j / slotsPerServer));
+ Map rackLocality = rackRegionLocality.get(rack);
+ if (rackLocality == null) {
+ rackLocality = new HashMap();
+ rackRegionLocality.put(rack, rackLocality);
+ }
+ Float localityObj = rackLocality.get(region);
+ float locality = localityObj == null ? 0 : localityObj.floatValue();
+ locality += localityPerServer[i][j];
+ rackLocality.put(region, locality);
+ }
+ }
+ for (int i = 0; i < numRegions; i++) {
+ for (int j = 0; j < regionSlots; j++) {
+ String rack = rackManager.getRack(servers.get(j / slotsPerServer));
+ Float totalRackLocalityObj =
+ rackRegionLocality.get(rack).get(regions.get(i));
+ float totalRackLocality = totalRackLocalityObj == null ?
+ 0 : totalRackLocalityObj.floatValue();
+
+ // Primary cost aims to favor servers with high node locality and low
+ // rack locality, so that secondaries and tertiaries can be chosen for
+ // nodes with high rack locality. This might give primaries with
+ // slightly less locality at first compared to a cost which only
+ // considers the node locality, but should be better in the long run.
+ primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
+ totalRackLocality);
+
+ // Secondary cost aims to favor servers with high node locality and high
+ // rack locality since the tertiary will be chosen from the same rack as
+ // the secondary. This could be negative, but that is okay.
+ secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
+
+ // Tertiary cost is only concerned with the node locality. It will later
+ // be restricted to only hosts on the same rack as the secondary.
+ tertiaryCost[i][j] = 1 - localityPerServer[i][j];
+ }
+ }
+ }
+
+ if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
+ // We want to minimize the number of regions which move as the result of a
+ // new assignment. Therefore, slightly penalize any placement which is for
+ // a host that is not currently serving the region.
+ for (int i = 0; i < numRegions; i++) {
+ for (int j = 0; j < servers.size(); j++) {
+ ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
+ if (currentAddress != null &&
+ !currentAddress.equals(servers.get(j))) {
+ for (int k = 0; k < slotsPerServer; k++) {
+ primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
+ }
+ }
+ }
+ }
+ }
+
+ // Artificially increase cost of last slot of each server to evenly
+ // distribute the slop, otherwise there will be a few servers with too few
+ // regions and many servers with the max number of regions.
+ for (int i = 0; i < numRegions; i++) {
+ for (int j = 0; j < regionSlots; j += slotsPerServer) {
+ primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
+ secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
+ tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
+ }
+ }
+
+ RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
+ regionSlots);
+ primaryCost = randomizedMatrix.transform(primaryCost);
+ int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
+ primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
+
+ // Modify the secondary and tertiary costs for each region/server pair to
+ // prevent a region from being assigned to the same rack for both primary
+ // and either one of secondary or tertiary.
+ for (int i = 0; i < numRegions; i++) {
+ int slot = primaryAssignment[i];
+ String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
+ for (int k = 0; k < servers.size(); k++) {
+ if (!rackManager.getRack(servers.get(k)).equals(rack)) {
+ continue;
+ }
+ if (k == slot / slotsPerServer) {
+ // Same node, do not place secondary or tertiary here ever.
+ for (int m = 0; m < slotsPerServer; m++) {
+ secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
+ tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
+ }
+ } else {
+ // Same rack, do not place secondary or tertiary here if possible.
+ for (int m = 0; m < slotsPerServer; m++) {
+ secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
+ tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
+ }
+ }
+ }
+ }
+ if (munkresForSecondaryAndTertiary) {
+ randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
+ secondaryCost = randomizedMatrix.transform(secondaryCost);
+ int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
+ secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
+
+ // Modify the tertiary costs for each region/server pair to ensure that a
+ // region is assigned to a tertiary server on the same rack as its secondary
+ // server, but not the same server in that rack.
+ for (int i = 0; i < numRegions; i++) {
+ int slot = secondaryAssignment[i];
+ String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
+ for (int k = 0; k < servers.size(); k++) {
+ if (k == slot / slotsPerServer) {
+ // Same node, do not place tertiary here ever.
+ for (int m = 0; m < slotsPerServer; m++) {
+ tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
+ }
+ } else {
+ if (rackManager.getRack(servers.get(k)).equals(rack)) {
+ continue;
+ }
+ // Different rack, do not place tertiary here if possible.
+ for (int m = 0; m < slotsPerServer; m++) {
+ tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
+ }
+ }
+ }
+ }
+
+ randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
+ tertiaryCost = randomizedMatrix.transform(tertiaryCost);
+ int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
+ tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
+
+ for (int i = 0; i < numRegions; i++) {
+ List favoredServers =
+ new ArrayList(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
+ ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+
+ s = servers.get(secondaryAssignment[i] / slotsPerServer);
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+
+ s = servers.get(tertiaryAssignment[i] / slotsPerServer);
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+ // Update the assignment plan
+ plan.updateAssignmentPlan(regions.get(i), favoredServers);
+ }
+ LOG.info("Generated the assignment plan for " + numRegions +
+ " regions from table " + tableName + " with " +
+ servers.size() + " region servers");
+ LOG.info("Assignment plan for secondary and tertiary generated " +
+ "using MunkresAssignment");
+ } else {
+ Map primaryRSMap = new HashMap();
+ for (int i = 0; i < numRegions; i++) {
+ primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
+ }
+ FavoredNodeAssignmentHelper favoredNodeHelper =
+ new FavoredNodeAssignmentHelper(servers, conf);
+ favoredNodeHelper.initialize();
+ Map secondaryAndTertiaryMap =
+ favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
+ for (int i = 0; i < numRegions; i++) {
+ List favoredServers =
+ new ArrayList(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
+ HRegionInfo currentRegion = regions.get(i);
+ ServerName s = primaryRSMap.get(currentRegion);
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+
+ ServerName[] secondaryAndTertiary =
+ secondaryAndTertiaryMap.get(currentRegion);
+ s = secondaryAndTertiary[0];
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+
+ s = secondaryAndTertiary[1];
+ favoredServers.add(new ServerName(s.getHostname(), s.getPort(),
+ ServerName.NON_STARTCODE));
+ // Update the assignment plan
+ plan.updateAssignmentPlan(regions.get(i), favoredServers);
+ }
+ LOG.info("Generated the assignment plan for " + numRegions +
+ " regions from table " + tableName + " with " +
+ servers.size() + " region servers");
+ LOG.info("Assignment plan for secondary and tertiary generated " +
+ "using placeSecondaryAndTertiaryWithRestrictions method");
+ }
+ }
+
+ public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
+ // Get the current region assignment snapshot by scanning from the META
+ SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
+ this.getRegionAssignmentSnapshot();
+
+ // Get the region locality map
+ Map> regionLocalityMap = null;
+ if (this.enforceLocality) {
+ regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+ }
+ // Initialize the assignment plan
+ FavoredNodesPlan plan = new FavoredNodesPlan();
+
+ // Get the table to region mapping
+ Map> tableToRegionMap =
+ assignmentSnapshot.getTableToRegionMap();
+ LOG.info("Start to generate the new assignment plan for the " +
+ + tableToRegionMap.keySet().size() + " tables" );
+ for (TableName table : tableToRegionMap.keySet()) {
+ try {
+ if (!this.targetTableSet.isEmpty() &&
+ !this.targetTableSet.contains(table)) {
+ continue;
+ }
+ // TODO: maybe run the placement in parallel for each table
+ genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
+ USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
+ } catch (Exception e) {
+ LOG.error("Get some exceptions for placing primary region server" +
+ "for table " + table + " because " + e);
+ }
+ }
+ LOG.info("Finish to generate the new assignment plan for the " +
+ + tableToRegionMap.keySet().size() + " tables" );
+ return plan;
+ }
+
+ /**
+ * Some algorithms for solving the assignment problem may traverse workers or
+ * jobs in linear order which may result in skewing the assignments of the
+ * first jobs in the matrix toward the last workers in the matrix if the
+ * costs are uniform. To avoid this kind of clumping, we can randomize the
+ * rows and columns of the cost matrix in a reversible way, such that the
+ * solution to the assignment problem can be interpreted in terms of the
+ * original untransformed cost matrix. Rows and columns are transformed
+ * independently such that the elements contained in any row of the input
+ * matrix are the same as the elements in the corresponding output matrix,
+ * and each row has its elements transformed in the same way. Similarly for
+ * columns.
+ */
+ protected static class RandomizedMatrix {
+ private final int rows;
+ private final int cols;
+ private final int[] rowTransform;
+ private final int[] rowInverse;
+ private final int[] colTransform;
+ private final int[] colInverse;
+
+ /**
+ * Create a randomization scheme for a matrix of a given size.
+ * @param rows the number of rows in the matrix
+ * @param cols the number of columns in the matrix
+ */
+ public RandomizedMatrix(int rows, int cols) {
+ this.rows = rows;
+ this.cols = cols;
+ Random random = new Random();
+ rowTransform = new int[rows];
+ rowInverse = new int[rows];
+ for (int i = 0; i < rows; i++) {
+ rowTransform[i] = i;
+ }
+ // Shuffle the row indices.
+ for (int i = rows - 1; i >= 0; i--) {
+ int r = random.nextInt(i + 1);
+ int temp = rowTransform[r];
+ rowTransform[r] = rowTransform[i];
+ rowTransform[i] = temp;
+ }
+ // Generate the inverse row indices.
+ for (int i = 0; i < rows; i++) {
+ rowInverse[rowTransform[i]] = i;
+ }
+
+ colTransform = new int[cols];
+ colInverse = new int[cols];
+ for (int i = 0; i < cols; i++) {
+ colTransform[i] = i;
+ }
+ // Shuffle the column indices.
+ for (int i = cols - 1; i >= 0; i--) {
+ int r = random.nextInt(i + 1);
+ int temp = colTransform[r];
+ colTransform[r] = colTransform[i];
+ colTransform[i] = temp;
+ }
+ // Generate the inverse column indices.
+ for (int i = 0; i < cols; i++) {
+ colInverse[colTransform[i]] = i;
+ }
+ }
+
+ /**
+ * Copy a given matrix into a new matrix, transforming each row index and
+ * each column index according to the randomization scheme that was created
+ * at construction time.
+ * @param matrix the cost matrix to transform
+ * @return a new matrix with row and column indices transformed
+ */
+ public float[][] transform(float[][] matrix) {
+ float[][] result = new float[rows][cols];
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < cols; j++) {
+ result[rowTransform[i]][colTransform[j]] = matrix[i][j];
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Copy a given matrix into a new matrix, transforming each row index and
+ * each column index according to the inverse of the randomization scheme
+ * that was created at construction time.
+ * @param matrix the cost matrix to be inverted
+ * @return a new matrix with row and column indices inverted
+ */
+ public float[][] invert(float[][] matrix) {
+ float[][] result = new float[rows][cols];
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < cols; j++) {
+ result[rowInverse[i]][colInverse[j]] = matrix[i][j];
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Given an array where each element {@code indices[i]} represents the
+ * randomized column index corresponding to randomized row index {@code i},
+ * create a new array with the corresponding inverted indices.
+ * @param indices an array of transformed indices to be inverted
+ * @return an array of inverted indices
+ */
+ public int[] invertIndices(int[] indices) {
+ int[] result = new int[indices.length];
+ for (int i = 0; i < indices.length; i++) {
+ result[rowInverse[i]] = colInverse[indices[i]];
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Print the assignment plan to the system output stream
+ * @param plan
+ */
+ public static void printAssignmentPlan(FavoredNodesPlan plan) {
+ if (plan == null) return;
+ LOG.info("========== Start to print the assignment plan ================");
+ // sort the map based on region info
+ Map> assignmentMap =
+ new TreeMap>(plan.getAssignmentMap());
+
+ for (Map.Entry> entry : assignmentMap.entrySet()) {
+
+ String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
+ String regionName = entry.getKey().getRegionNameAsString();
+ LOG.info("Region: " + regionName );
+ LOG.info("Its favored nodes: " + serverList);
+ }
+ LOG.info("========== Finish to print the assignment plan ================");
+ }
+
+ /**
+ * Update the assignment plan into .META.
+ * @param plan the assignments plan to be updated into .META.
+ * @throws IOException if cannot update assignment plan in .META.
+ */
+ public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
+ throws IOException {
+ try {
+ LOG.info("Start to update the META with the new assignment plan");
+ Map> assignmentMap =
+ plan.getAssignmentMap();
+ FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(assignmentMap, conf);
+ LOG.info("Updated the META with the new assignment plan");
+ } catch (Exception e) {
+ LOG.error("Failed to update META with the new assignment" +
+ "plan because " + e.getMessage());
+ }
+ }
+
+ /**
+ * Update the assignment plan to all the region servers
+ * @param plan
+ * @throws IOException
+ */
+ private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
+ throws IOException{
+ LOG.info("Start to update the region servers with the new assignment plan");
+ // Get the region to region server map
+ Map> currentAssignment =
+ this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
+ HConnection connection = this.getHBaseAdmin().getConnection();
+
+ // track of the failed and succeeded updates
+ int succeededNum = 0;
+ Map failedUpdateMap =
+ new HashMap();
+
+ for (Map.Entry> entry :
+ currentAssignment.entrySet()) {
+ List>> regionUpdateInfos =
+ new ArrayList>>();
+ try {
+ // Keep track of the favored updates for the current region server
+ FavoredNodesPlan singleServerPlan = null;
+ // Find out all the updates for the current region server
+ for (HRegionInfo region : entry.getValue()) {
+ List favoredServerList = plan.getFavoredNodes(region);
+ if (favoredServerList != null &&
+ favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
+ // Create the single server plan if necessary
+ if (singleServerPlan == null) {
+ singleServerPlan = new FavoredNodesPlan();
+ }
+ // Update the single server update
+ singleServerPlan.updateAssignmentPlan(region, favoredServerList);
+ }
+ regionUpdateInfos.add(
+ new Triple>(region, null, favoredServerList));
+ }
+ if (singleServerPlan != null) {
+ // Update the current region server with its updated favored nodes
+ BlockingInterface currentRegionServer = connection.getAdmin(entry.getKey());
+ OpenRegionRequest request =
+ RequestConverter.buildOpenRegionRequest(regionUpdateInfos);
+
+ UpdateFavoredNodesResponse updateFavoredNodesResponse =
+ currentRegionServer.updateFavoredNodes(null, request);
+ LOG.info("Region server " +
+ ProtobufUtil.getServerInfo(currentRegionServer).getServerName() +
+ " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
+ singleServerPlan.getAssignmentMap().size() +
+ " regions with the assignment plan");
+ succeededNum ++;
+ }
+ } catch (Exception e) {
+ failedUpdateMap.put(entry.getKey(), e);
+ }
+ }
+ // log the succeeded updates
+ LOG.info("Updated " + succeededNum + " region servers with " +
+ "the new assignment plan");
+
+ // log the failed updates
+ int failedNum = failedUpdateMap.size();
+ if (failedNum != 0) {
+ LOG.error("Failed to update the following + " + failedNum +
+ " region servers with its corresponding favored nodes");
+ for (Map.Entry entry :
+ failedUpdateMap.entrySet() ) {
+ LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
+ " because of " + entry.getValue().getMessage());
+ }
+ }
+ }
+
+ public void updateAssignmentPlan(FavoredNodesPlan plan)
+ throws IOException {
+ LOG.info("Start to update the new assignment plan for the META table and" +
+ " the region servers");
+ // Update the new assignment plan to META
+ updateAssignmentPlanToMeta(plan);
+ // Update the new assignment plan to Region Servers
+ updateAssignmentPlanToRegionServers(plan);
+ LOG.info("Finish to update the new assignment plan for the META table and" +
+ " the region servers");
+ }
+
+ /**
+ * Return how many regions will move per table since their primary RS will
+ * change
+ *
+ * @param newPlan - new AssignmentPlan
+ * @return how many primaries will move per table
+ */
+ public Map getRegionsMovement(FavoredNodesPlan newPlan)
+ throws IOException {
+ Map movesPerTable = new HashMap();
+ SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
+ Map> tableToRegions = snapshot
+ .getTableToRegionMap();
+ FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
+ Set tables = snapshot.getTableSet();
+ for (TableName table : tables) {
+ int movedPrimaries = 0;
+ if (!this.targetTableSet.isEmpty()
+ && !this.targetTableSet.contains(table)) {
+ continue;
+ }
+ List regions = tableToRegions.get(table);
+ for (HRegionInfo region : regions) {
+ List oldServers = oldPlan.getFavoredNodes(region);
+ List newServers = newPlan.getFavoredNodes(region);
+ if (oldServers != null && newServers != null) {
+ ServerName oldPrimary = oldServers.get(0);
+ ServerName newPrimary = newServers.get(0);
+ if (oldPrimary.compareTo(newPrimary) != 0) {
+ movedPrimaries++;
+ }
+ }
+ }
+ movesPerTable.put(table, movedPrimaries);
+ }
+ return movesPerTable;
+ }
+
+ /**
+ * Compares two plans and check whether the locality dropped or increased
+ * (prints the information as a string) also prints the baseline locality
+ *
+ * @param movesPerTable - how many primary regions will move per table
+ * @param regionLocalityMap - locality map from FS
+ * @param newPlan - new assignment plan
+ * @throws IOException
+ */
+ public void checkDifferencesWithOldPlan(Map movesPerTable,
+ Map> regionLocalityMap, FavoredNodesPlan newPlan)
+ throws IOException {
+ // localities for primary, secondary and tertiary
+ SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
+ FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
+ Set tables = snapshot.getTableSet();
+ Map> tableToRegionsMap = snapshot.getTableToRegionMap();
+ for (TableName table : tables) {
+ float[] deltaLocality = new float[3];
+ float[] locality = new float[3];
+ if (!this.targetTableSet.isEmpty()
+ && !this.targetTableSet.contains(table)) {
+ continue;
+ }
+ List regions = tableToRegionsMap.get(table);
+ System.out.println("==================================================");
+ System.out.println("Assignment Plan Projection Report For Table: " + table);
+ System.out.println("\t Total regions: " + regions.size());
+ System.out.println("\t" + movesPerTable.get(table)
+ + " primaries will move due to their primary has changed");
+ for (HRegionInfo currentRegion : regions) {
+ Map regionLocality = regionLocalityMap.get(currentRegion
+ .getEncodedName());
+ if (regionLocality == null) {
+ continue;
+ }
+ List oldServers = oldPlan.getFavoredNodes(currentRegion);
+ List newServers = newPlan.getFavoredNodes(currentRegion);
+ if (newServers != null && oldServers != null) {
+ int i=0;
+ for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
+ ServerName newServer = newServers.get(p.ordinal());
+ ServerName oldServer = oldServers.get(p.ordinal());
+ Float oldLocality = 0f;
+ if (oldServers != null) {
+ oldLocality = regionLocality.get(oldServer.getHostname());
+ if (oldLocality == null) {
+ oldLocality = 0f;
+ }
+ locality[i] += oldLocality;
+ }
+ Float newLocality = regionLocality.get(newServer.getHostname());
+ if (newLocality == null) {
+ newLocality = 0f;
+ }
+ deltaLocality[i] += newLocality - oldLocality;
+ i++;
+ }
+ }
+ }
+ DecimalFormat df = new java.text.DecimalFormat( "#.##");
+ for (int i = 0; i < deltaLocality.length; i++) {
+ System.out.print("\t\t Baseline locality for ");
+ if (i == 0) {
+ System.out.print("primary ");
+ } else if (i == 1) {
+ System.out.print("secondary ");
+ } else if (i == 2) {
+ System.out.print("tertiary ");
+ }
+ System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
+ System.out.print("\t\t Locality will change with the new plan: ");
+ System.out.println(df.format(100 * deltaLocality[i] / regions.size())
+ + "%");
+ }
+ System.out.println("\t Baseline dispersion");
+ printDispersionScores(table, snapshot, regions.size(), null, true);
+ System.out.println("\t Projected dispersion");
+ printDispersionScores(table, snapshot, regions.size(), newPlan, true);
+ }
+ }
+
+ public void printDispersionScores(TableName table,
+ SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
+ boolean simplePrint) {
+ if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
+ return;
+ }
+ AssignmentVerificationReport report = new AssignmentVerificationReport();
+ report.fillUpDispersion(table, snapshot, newPlan);
+ List dispersion = report.getDispersionInformation();
+ if (simplePrint) {
+ DecimalFormat df = new java.text.DecimalFormat("#.##");
+ System.out.println("\tAvg dispersion score: "
+ + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
+ + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
+ + df.format(dispersion.get(2)) + " hosts;");
+ } else {
+ LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
+ + " ; The average dispersion score is " + dispersion.get(0));
+ }
+ }
+
+ public void printLocalityAndDispersionForCurrentPlan(
+ Map> regionLocalityMap) throws IOException {
+ SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
+ FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
+ Set tables = snapshot.getTableSet();
+ Map> tableToRegionsMap = snapshot
+ .getTableToRegionMap();
+ for (TableName table : tables) {
+ float[] locality = new float[3];
+ if (!this.targetTableSet.isEmpty()
+ && !this.targetTableSet.contains(table)) {
+ continue;
+ }
+ List regions = tableToRegionsMap.get(table);
+ for (HRegionInfo currentRegion : regions) {
+ Map regionLocality = regionLocalityMap.get(currentRegion
+ .getEncodedName());
+ if (regionLocality == null) {
+ continue;
+ }
+ List servers = assignmentPlan.getFavoredNodes(currentRegion);
+ if (servers != null) {
+ int i = 0;
+ for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
+ ServerName server = servers.get(p.ordinal());
+ Float currentLocality = 0f;
+ if (servers != null) {
+ currentLocality = regionLocality.get(server.getHostname());
+ if (currentLocality == null) {
+ currentLocality = 0f;
+ }
+ locality[i] += currentLocality;
+ }
+ i++;
+ }
+ }
+ }
+ for (int i = 0; i < locality.length; i++) {
+ String copy = null;
+ if (i == 0) {
+ copy = "primary";
+ } else if (i == 1) {
+ copy = "secondary";
+ } else if (i == 2) {
+ copy = "tertiary" ;
+ }
+ float avgLocality = 100 * locality[i] / regions.size();
+ LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
+ + " ; The average locality for " + copy+ " is " + avgLocality + " %");
+ }
+ printDispersionScores(table, snapshot, regions.size(), null, false);
+ }
+ }
+
+ /**
+ * @param favoredNodesStr The String of favored nodes
+ * @return the list of ServerName for the byte array of favored nodes.
+ */
+ public static List getFavoredNodeList(String favoredNodesStr) {
+ String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
+ if (favoredNodesArray == null)
+ return null;
+
+ List serverList = new ArrayList();
+ for (String hostNameAndPort : favoredNodesArray) {
+ serverList.add(new ServerName(hostNameAndPort, ServerName.NON_STARTCODE));
+ }
+ return serverList;
+ }
+
+ public static void main(String args[]) throws IOException {
+ Options opt = new Options();
+ opt.addOption("w", "write", false, "write the assignments to META only");
+ opt.addOption("u", "update", false,
+ "update the assignments to META and RegionServers together");
+ opt.addOption("n", "dry-run", false, "do not write assignments to META");
+ opt.addOption("v", "verify", false, "verify current assignments against META");
+ opt.addOption("p", "print", false, "print the current assignment plan in META");
+ opt.addOption("h", "help", false, "print usage");
+ opt.addOption("d", "verification-details", false,
+ "print the details of verification report");
+
+ opt.addOption("zk", true, "to set the zookeeper quorum");
+ opt.addOption("fs", true, "to set HDFS");
+ opt.addOption("hbase_root", true, "to set hbase_root directory");
+
+ opt.addOption("overwrite", false,
+ "overwrite the favored nodes for a single region," +
+ "for example: -update -r regionName -f server1:port,server2:port,server3:port");
+ opt.addOption("r", true, "The region name that needs to be updated");
+ opt.addOption("f", true, "The new favored nodes");
+
+ opt.addOption("tables", true,
+ "The list of table names splitted by ',' ;" +
+ "For example: -tables: t1,t2,...,tn");
+ opt.addOption("l", "locality", true, "enforce the maxium locality");
+ opt.addOption("m", "min-move", true, "enforce minium assignment move");
+ opt.addOption("diff", false, "calculate difference between assignment plans");
+ opt.addOption("munkres", false,
+ "use munkres to place secondaries and tertiaries");
+ opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
+ "information for current plan");
+ try {
+ // Set the log4j
+ Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.ERROR);
+ Logger.getLogger("org.apache.hadoop.hbase.master.RegionPlacementMaintainer")
+ .setLevel(Level.INFO);
+
+ CommandLine cmd = new GnuParser().parse(opt, args);
+ Configuration conf = HBaseConfiguration.create();
+
+ boolean enforceMinAssignmentMove = true;
+ boolean enforceLocality = true;
+ boolean verificationDetails = false;
+
+ // Read all the options
+ if ((cmd.hasOption("l") &&
+ cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
+ (cmd.hasOption("locality") &&
+ cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
+ enforceLocality = false;
+ }
+
+ if ((cmd.hasOption("m") &&
+ cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
+ (cmd.hasOption("min-move") &&
+ cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
+ enforceMinAssignmentMove = false;
+ }
+
+ if (cmd.hasOption("zk")) {
+ conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
+ LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
+ }
+
+ if (cmd.hasOption("fs")) {
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
+ LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ }
+
+ if (cmd.hasOption("hbase_root")) {
+ conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
+ LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
+ }
+
+ // Create the region placement obj
+ RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
+ enforceMinAssignmentMove);
+
+ if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
+ verificationDetails = true;
+ }
+
+ if (cmd.hasOption("tables")) {
+ String tableNameListStr = cmd.getOptionValue("tables");
+ String[] tableNames = StringUtils.split(tableNameListStr, ",");
+ rp.setTargetTableName(tableNames);
+ }
+
+ if (cmd.hasOption("munkres")) {
+ USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
+ }
+
+ // Read all the modes
+ if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+ // Verify the region placement.
+ rp.verifyRegionPlacement(verificationDetails);
+ } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
+ // Generate the assignment plan only without updating the META and RS
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ printAssignmentPlan(plan);
+ } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
+ // Generate the new assignment plan
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ // Print the new assignment plan
+ printAssignmentPlan(plan);
+ // Write the new assignment plan to META
+ rp.updateAssignmentPlanToMeta(plan);
+ } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
+ // Generate the new assignment plan
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ // Print the new assignment plan
+ printAssignmentPlan(plan);
+ // Update the assignment to META and Region Servers
+ rp.updateAssignmentPlan(plan);
+ } else if (cmd.hasOption("diff")) {
+ FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
+ Map> locality = FSUtils
+ .getRegionDegreeLocalityMappingFromFS(conf);
+ Map movesPerTable = rp.getRegionsMovement(newPlan);
+ rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+ System.out.println("Do you want to update the assignment plan? [y/n]");
+ Scanner s = new Scanner(System.in);
+ String input = s.nextLine().trim();
+ if (input.equals("y")) {
+ System.out.println("Updating assignment plan...");
+ rp.updateAssignmentPlan(newPlan);
+ }
+ s.close();
+ } else if (cmd.hasOption("ld")) {
+ Map> locality = FSUtils
+ .getRegionDegreeLocalityMappingFromFS(conf);
+ rp.printLocalityAndDispersionForCurrentPlan(locality);
+ } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
+ FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
+ printAssignmentPlan(plan);
+ } else if (cmd.hasOption("overwrite")) {
+ if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
+ throw new IllegalArgumentException("Please specify: " +
+ " -update -r regionName -f server1:port,server2:port,server3:port");
+ }
+
+ String regionName = cmd.getOptionValue("r");
+ String favoredNodesStr = cmd.getOptionValue("f");
+ LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
+ favoredNodesStr);
+ List favoredNodes = null;
+ HRegionInfo regionInfo =
+ rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
+ if (regionInfo == null) {
+ LOG.error("Cannot find the region " + regionName + " from the META");
+ } else {
+ try {
+ favoredNodes = getFavoredNodeList(favoredNodesStr);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Cannot parse the invalid favored nodes because " + e);
+ }
+ FavoredNodesPlan newPlan = new FavoredNodesPlan();
+ newPlan.updateAssignmentPlan(regionInfo, favoredNodes);
+ rp.updateAssignmentPlan(newPlan);
+ }
+ } else {
+ printHelp(opt);
+ }
+ } catch (ParseException e) {
+ printHelp(opt);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java
new file mode 100644
index 0000000..80d2e27
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Used internally for reading meta and constructing datastructures that are
+ * then queried, for things like regions to regionservers, table to regions, etc.
+ * It also records the favored nodes mapping for regions.
+ *
+ */
+@InterfaceAudience.Private
+public class SnapshotOfRegionAssignmentFromMeta {
+ private static final Log LOG = LogFactory.getLog(SnapshotOfRegionAssignmentFromMeta.class
+ .getName());
+
+ private CatalogTracker tracker;
+
+ /** the table name to region map */
+ private final Map> tableToRegionMap;
+ /** the region to region server map */
+ //private final Map regionToRegionServerMap;
+ private Map regionToRegionServerMap;
+ /** the region name to region info map */
+ private final Map regionNameToRegionInfoMap;
+
+ /** the regionServer to region map */
+ private final Map> regionServerToRegionMap;
+ /** the existing assignment plan in the META region */
+ private final FavoredNodesPlan existingAssignmentPlan;
+ private final Set disabledTables;
+ private final boolean excludeOfflinedSplitParents;
+
+ public SnapshotOfRegionAssignmentFromMeta(CatalogTracker tracker) {
+ this(tracker, new HashSet(), false);
+ }
+
+ public SnapshotOfRegionAssignmentFromMeta(CatalogTracker tracker, Set disabledTables,
+ boolean excludeOfflinedSplitParents) {
+ this.tracker = tracker;
+ tableToRegionMap = new HashMap>();
+ regionToRegionServerMap = new HashMap();
+ regionServerToRegionMap = new HashMap>();
+ regionNameToRegionInfoMap = new TreeMap();
+ existingAssignmentPlan = new FavoredNodesPlan();
+ this.disabledTables = disabledTables;
+ this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
+ }
+
+ /**
+ * Initialize the region assignment snapshot by scanning the META table
+ * @throws IOException
+ */
+ public void initialize() throws IOException {
+ LOG.info("Start to scan the META for the current region assignment " +
+ "snappshot");
+ // TODO: at some point this code could live in the MetaReader
+ Visitor v = new Visitor() {
+ @Override
+ public boolean visit(Result result) throws IOException {
+ try {
+ if (result == null || result.isEmpty()) return true;
+ Pair regionAndServer =
+ HRegionInfo.getHRegionInfoAndServerName(result);
+ HRegionInfo hri = regionAndServer.getFirst();
+ if (hri == null) return true;
+ if (hri.getTableName() == null) return true;
+ if (disabledTables.contains(hri.getTableName())) {
+ return true;
+ }
+ // Are we to include split parents in the list?
+ if (excludeOfflinedSplitParents && hri.isSplit()) return true;
+ // Add the current assignment to the snapshot
+ addAssignment(hri, regionAndServer.getSecond());
+ addRegion(hri);
+
+ // the code below is to handle favored nodes
+ byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
+ FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
+ if (favoredNodes == null) return true;
+ // Add the favored nodes into assignment plan
+ ServerName[] favoredServerList =
+ FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
+ // Add the favored nodes into assignment plan
+ existingAssignmentPlan.updateFavoredNodesMap(hri,
+ Arrays.asList(favoredServerList));
+ return true;
+ } catch (RuntimeException e) {
+ LOG.error("Catche remote exception " + e.getMessage() +
+ " when processing" + result);
+ throw e;
+ }
+ }
+ };
+ // Scan .META. to pick up user regions
+ MetaReader.fullScan(tracker, v);
+ //regionToRegionServerMap = regions;
+ LOG.info("Finished to scan the META for the current region assignment" +
+ "snapshot");
+ }
+
+ private void addRegion(HRegionInfo regionInfo) {
+ // Process the region name to region info map
+ regionNameToRegionInfoMap.put(regionInfo.getRegionNameAsString(), regionInfo);
+
+ // Process the table to region map
+ TableName tableName = regionInfo.getTableName();
+ List regionList = tableToRegionMap.get(tableName);
+ if (regionList == null) {
+ regionList = new ArrayList();
+ }
+ // Add the current region info into the tableToRegionMap
+ regionList.add(regionInfo);
+ tableToRegionMap.put(tableName, regionList);
+ }
+
+ private void addAssignment(HRegionInfo regionInfo, ServerName server) {
+ // Process the region to region server map
+ regionToRegionServerMap.put(regionInfo, server);
+
+ // Process the region server to region map
+ List regionList = regionServerToRegionMap.get(server);
+ if (regionList == null) {
+ regionList = new ArrayList();
+ }
+ regionList.add(regionInfo);
+ regionServerToRegionMap.put(server, regionList);
+ }
+
+ /**
+ * Get the regioninfo for a region
+ * @return the regioninfo
+ */
+ public Map getRegionNameToRegionInfoMap() {
+ return this.regionNameToRegionInfoMap;
+ }
+
+ /**
+ * Get regions for tables
+ * @return a mapping from table to regions
+ */
+ public Map> getTableToRegionMap() {
+ return tableToRegionMap;
+ }
+
+ /**
+ * Get region to region server map
+ * @return region to region server map
+ */
+ public Map getRegionToRegionServerMap() {
+ return regionToRegionServerMap;
+ }
+
+ /**
+ * Get regionserver to region map
+ * @return regionserver to region map
+ */
+ public Map> getRegionServerToRegionMap() {
+ return regionServerToRegionMap;
+ }
+
+ /**
+ * Get the favored nodes plan
+ * @return the existing favored nodes plan
+ */
+ public FavoredNodesPlan getExistingAssignmentPlan() {
+ return this.existingAssignmentPlan;
+ }
+
+ /**
+ * Get the table set
+ * @return the table set
+ */
+ public Set getTableSet() {
+ return this.tableToRegionMap.keySet();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java
index 370f8c0..5413d0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java
@@ -21,14 +21,13 @@ package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.TreeMap;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,17 +39,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.FavoredNodes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -88,58 +84,34 @@ public class FavoredNodeAssignmentHelper {
}
/**
- * Perform full scan of the meta table similar to
- * {@link MetaReader#fullScan(CatalogTracker, Set, boolean)} except that this is
- * aware of the favored nodes
+ * Update meta table with favored nodes info
+ * @param regionToFavoredNodes
* @param catalogTracker
- * @param disabledTables
- * @param excludeOfflinedSplitParents
- * @param balancer required because we need to let the balancer know about the
- * current favored nodes from meta scan
- * @return Returns a map of every region to it's currently assigned server,
- * according to META. If the region does not have an assignment it will have
- * a null value in the map.
* @throws IOException
*/
- public static Map fullScan(
- CatalogTracker catalogTracker, final Set disabledTables,
- final boolean excludeOfflinedSplitParents,
- FavoredNodeLoadBalancer balancer) throws IOException {
- final Map regions =
- new TreeMap();
- final Map favoredNodesMap =
- new HashMap();
- Visitor v = new Visitor() {
- @Override
- public boolean visit(Result r) throws IOException {
- if (r == null || r.isEmpty()) return true;
- Pair region = HRegionInfo.getHRegionInfoAndServerName(r);
- HRegionInfo hri = region.getFirst();
- if (hri == null) return true;
- if (hri.getTableName() == null) return true;
- if (disabledTables.contains(
- hri.getTableName())) return true;
- // Are we to include split parents in the list?
- if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
- regions.put(hri, region.getSecond());
- byte[] favoredNodes = r.getValue(HConstants.CATALOG_FAMILY,
- FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
- if (favoredNodes != null) {
- ServerName[] favoredServerList =
- FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
- favoredNodesMap.put(hri, favoredServerList);
- }
- return true;
+ public static void updateMetaWithFavoredNodesInfo(
+ Map> regionToFavoredNodes,
+ CatalogTracker catalogTracker) throws IOException {
+ List puts = new ArrayList();
+ for (Map.Entry> entry : regionToFavoredNodes.entrySet()) {
+ Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
+ if (put != null) {
+ puts.add(put);
}
- };
- MetaReader.fullScan(catalogTracker, v);
- balancer.noteFavoredNodes(favoredNodesMap);
- return regions;
+ }
+ MetaEditor.putsToMetaTable(catalogTracker, puts);
+ LOG.info("Added " + puts.size() + " regions in META");
}
+ /**
+ * Update meta table with favored nodes info
+ * @param regionToFavoredNodes
+ * @param conf
+ * @throws IOException
+ */
public static void updateMetaWithFavoredNodesInfo(
Map> regionToFavoredNodes,
- CatalogTracker catalogTracker) throws IOException {
+ Configuration conf) throws IOException {
List puts = new ArrayList();
for (Map.Entry> entry : regionToFavoredNodes.entrySet()) {
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
@@ -147,7 +119,14 @@ public class FavoredNodeAssignmentHelper {
puts.add(put);
}
}
- MetaEditor.putsToMetaTable(catalogTracker, puts);
+ // Write the region assignments to the meta table.
+ HTable metaTable = null;
+ try {
+ metaTable = new HTable(conf, TableName.META_TABLE_NAME);
+ metaTable.put(puts);
+ } finally {
+ if (metaTable != null) metaTable.close();
+ }
LOG.info("Added " + puts.size() + " regions in META");
}
@@ -190,10 +169,10 @@ public class FavoredNodeAssignmentHelper {
}
/**
- * @param serverList
+ * @param serverAddrList
* @return PB'ed bytes of {@link FavoredNodes} generated by the server list.
*/
- static byte[] getFavoredNodes(List serverAddrList) {
+ public static byte[] getFavoredNodes(List serverAddrList) {
FavoredNodes.Builder f = FavoredNodes.newBuilder();
for (ServerName s : serverAddrList) {
HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder();
@@ -296,13 +275,164 @@ public class FavoredNodeAssignmentHelper {
}
} catch (Exception e) {
LOG.warn("Cannot place the favored nodes for region " +
- regionInfo.getRegionNameAsString() + " because " + e);
+ regionInfo.getRegionNameAsString() + " because " + e, e);
+ continue;
+ }
+ }
+ return secondaryAndTertiaryMap;
+ }
+
+ private Map> mapRSToPrimaries(
+ Map primaryRSMap) {
+ Map> primaryServerMap =
+ new HashMap>();
+ for (Entry e : primaryRSMap.entrySet()) {
+ Set currentSet = primaryServerMap.get(e.getValue());
+ if (currentSet == null) {
+ currentSet = new HashSet();
+ }
+ currentSet.add(e.getKey());
+ primaryServerMap.put(e.getValue(), currentSet);
+ }
+ return primaryServerMap;
+ }
+
+ /**
+ * For regions that share the primary, avoid placing the secondary and tertiary
+ * on a same RS. Used for generating new assignments for the
+ * primary/secondary/tertiary RegionServers
+ * @param primaryRSMap
+ * @return the map of regions to the servers the region-files should be hosted on
+ * @throws IOException
+ */
+ public Map placeSecondaryAndTertiaryWithRestrictions(
+ Map primaryRSMap) {
+ Map> serverToPrimaries =
+ mapRSToPrimaries(primaryRSMap);
+ Map secondaryAndTertiaryMap =
+ new HashMap();
+
+ for (Entry entry : primaryRSMap.entrySet()) {
+ // Get the target region and its primary region server rack
+ HRegionInfo regionInfo = entry.getKey();
+ ServerName primaryRS = entry.getValue();
+ try {
+ // Get the rack for the primary region server
+ String primaryRack = rackManager.getRack(primaryRS);
+ ServerName[] favoredNodes = null;
+ if (getTotalNumberOfRacks() == 1) {
+ // Single rack case: have to pick the secondary and tertiary
+ // from the same rack
+ favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
+ } else {
+ favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries,
+ secondaryAndTertiaryMap, primaryRack, primaryRS, regionInfo);
+ }
+ if (favoredNodes != null) {
+ secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
+ LOG.debug("Place the secondary and tertiary region server for region "
+ + regionInfo.getRegionNameAsString());
+ }
+ } catch (Exception e) {
+ LOG.warn("Cannot place the favored nodes for region "
+ + regionInfo.getRegionNameAsString() + " because " + e, e);
continue;
}
}
return secondaryAndTertiaryMap;
}
+ private ServerName[] multiRackCaseWithRestrictions(
+ Map> serverToPrimaries,
+ Map secondaryAndTertiaryMap,
+ String primaryRack, ServerName primaryRS, HRegionInfo regionInfo) throws IOException {
+ // Random to choose the secondary and tertiary region server
+ // from another rack to place the secondary and tertiary
+ // Random to choose one rack except for the current rack
+ Set rackSkipSet = new HashSet();
+ rackSkipSet.add(primaryRack);
+ String secondaryRack = getOneRandomRack(rackSkipSet);
+ List serverList = getServersFromRack(secondaryRack);
+ Set serverSet = new HashSet();
+ serverSet.addAll(serverList);
+ ServerName[] favoredNodes;
+ if (serverList.size() >= 2) {
+ // Randomly pick up two servers from this secondary rack
+ // Skip the secondary for the tertiary placement
+ // skip the servers which share the primary already
+ Set primaries = serverToPrimaries.get(primaryRS);
+ Set skipServerSet = new HashSet();
+ while (true) {
+ ServerName[] secondaryAndTertiary = null;
+ if (primaries.size() > 1) {
+ // check where his tertiary and secondary are
+ for (HRegionInfo primary : primaries) {
+ secondaryAndTertiary = secondaryAndTertiaryMap.get(primary);
+ if (secondaryAndTertiary != null) {
+ if (regionServerToRackMap.get(secondaryAndTertiary[0]).equals(secondaryRack)) {
+ skipServerSet.add(secondaryAndTertiary[0]);
+ }
+ if (regionServerToRackMap.get(secondaryAndTertiary[1]).equals(secondaryRack)) {
+ skipServerSet.add(secondaryAndTertiary[1]);
+ }
+ }
+ }
+ }
+ if (skipServerSet.size() + 2 <= serverSet.size())
+ break;
+ skipServerSet.clear();
+ rackSkipSet.add(secondaryRack);
+ // we used all racks
+ if (rackSkipSet.size() == getTotalNumberOfRacks()) {
+ // remove the last two added and break
+ skipServerSet.remove(secondaryAndTertiary[0]);
+ skipServerSet.remove(secondaryAndTertiary[1]);
+ break;
+ }
+ secondaryRack = getOneRandomRack(rackSkipSet);
+ serverList = getServersFromRack(secondaryRack);
+ serverSet = new HashSet();
+ serverSet.addAll(serverList);
+ }
+
+ // Place the secondary RS
+ ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet);
+ skipServerSet.add(secondaryRS);
+ // Place the tertiary RS
+ ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet);
+
+ if (secondaryRS == null || tertiaryRS == null) {
+ LOG.error("Cannot place the secondary and tertiary"
+ + " region server for region "
+ + regionInfo.getRegionNameAsString());
+ }
+ // Create the secondary and tertiary pair
+ favoredNodes = new ServerName[2];
+ favoredNodes[0] = secondaryRS;
+ favoredNodes[1] = tertiaryRS;
+ } else {
+ // Pick the secondary rs from this secondary rack
+ // and pick the tertiary from another random rack
+ favoredNodes = new ServerName[2];
+ ServerName secondary = getOneRandomServer(secondaryRack);
+ favoredNodes[0] = secondary;
+
+ // Pick the tertiary
+ if (getTotalNumberOfRacks() == 2) {
+ // Pick the tertiary from the same rack of the primary RS
+ Set serverSkipSet = new HashSet();
+ serverSkipSet.add(primaryRS);
+ favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet);
+ } else {
+ // Pick the tertiary from another rack
+ rackSkipSet.add(secondaryRack);
+ String tertiaryRandomRack = getOneRandomRack(rackSkipSet);
+ favoredNodes[1] = getOneRandomServer(tertiaryRandomRack);
+ }
+ }
+ return favoredNodes;
+ }
+
private ServerName[] singleRackCase(HRegionInfo regionInfo,
ServerName primaryRS,
String primaryRack) throws IOException {
@@ -311,7 +441,7 @@ public class FavoredNodeAssignmentHelper {
List serverList = getServersFromRack(primaryRack);
if (serverList.size() <= 2) {
// Single region server case: cannot not place the favored nodes
- // on any server; !domain.canPlaceFavoredNodes()
+ // on any server;
return null;
} else {
// Randomly select two region servers from the server list and make sure
@@ -400,7 +530,7 @@ public class FavoredNodeAssignmentHelper {
return (serverSize >= FAVORED_NODES_NUM);
}
- void initialize() {
+ public void initialize() {
for (ServerName sn : this.servers) {
String rackName = this.rackManager.getRack(sn);
List serverList = this.rackToRegionServerMap.get(rackName);
@@ -462,4 +592,14 @@ public class FavoredNodeAssignmentHelper {
return randomRack;
}
+
+ public static String getFavoredNodesAsString(List nodes) {
+ StringBuffer strBuf = new StringBuffer();
+ int i = 0;
+ for (ServerName node : nodes) {
+ strBuf.append(node.getHostAndPort());
+ if (++i != nodes.size()) strBuf.append(";");
+ }
+ return strBuf.toString();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
index c76ce44..d8cee20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,12 +29,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.balancer.FavoredNodes.Position;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
+import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan.Position;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -55,23 +57,91 @@ import org.apache.hadoop.hbase.util.Pair;
public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
private static final Log LOG = LogFactory.getLog(FavoredNodeLoadBalancer.class);
- private FavoredNodes globalFavoredNodesAssignmentPlan;
+ private FavoredNodesPlan globalFavoredNodesAssignmentPlan;
private RackManager rackManager;
+ Configuration conf;
@Override
public void setConf(Configuration conf) {
- globalFavoredNodesAssignmentPlan = new FavoredNodes();
+ globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
this.rackManager = new RackManager(conf);
+ this.conf = conf;
}
@Override
- public List balanceCluster(Map> clusterState) {
- //TODO. At a high level, this should look at the block locality per region, and
- //then reassign regions based on which nodes have the most blocks of the region
- //file(s). There could be different ways like minimize region movement, or, maximum
- //locality, etc. The other dimension to look at is whether Stochastic loadbalancer
- //can be integrated with this
- throw new UnsupportedOperationException("Not implemented yet");
+ public List balanceCluster(Map> clusterState) {
+ //TODO. Look at is whether Stochastic loadbalancer can be integrated with this
+ List plans = new ArrayList();
+ //perform a scan of the meta to get the latest updates (if any)
+ SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
+ new SnapshotOfRegionAssignmentFromMeta(super.services.getCatalogTracker());
+ try {
+ snaphotOfRegionAssignment.initialize();
+ } catch (IOException ie) {
+ LOG.warn("Not running balancer since exception was thrown " + ie);
+ return plans;
+ }
+ globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
+ Map serverNameToServerNameWithoutCode =
+ new HashMap();
+ Map serverNameWithoutCodeToServerName =
+ new HashMap();
+ ServerManager serverMgr = super.services.getServerManager();
+ for (ServerName sn: serverMgr.getOnlineServersList()) {
+ ServerName s = new ServerName(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
+ serverNameToServerNameWithoutCode.put(sn, s);
+ serverNameWithoutCodeToServerName.put(s, sn);
+ }
+ for (Map.Entry> entry : clusterState.entrySet()) {
+ ServerName currentServer = entry.getKey();
+ //get a server without the startcode for the currentServer
+ ServerName currentServerWithoutStartCode = new ServerName(currentServer.getHostname(),
+ currentServer.getPort(), ServerName.NON_STARTCODE);
+ List list = entry.getValue();
+ for (HRegionInfo region : list) {
+ if(region.getTableName().getNamespaceAsString()
+ .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
+ continue;
+ }
+ List favoredNodes = globalFavoredNodesAssignmentPlan.getFavoredNodes(region);
+ if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
+ continue; //either favorednodes does not exist or we are already on the primary node
+ }
+ ServerName destination = null;
+ //check whether the primary is available
+ destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
+ if (destination == null) {
+ //check whether the region is on secondary/tertiary
+ if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
+ currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
+ continue;
+ }
+ //the region is currently on none of the favored nodes
+ //get it on one of them if possible
+ ServerLoad l1 = super.services.getServerManager().getLoad(
+ serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
+ ServerLoad l2 = super.services.getServerManager().getLoad(
+ serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
+ if (l1 != null && l2 != null) {
+ if (l1.getLoad() > l2.getLoad()) {
+ destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
+ } else {
+ destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
+ }
+ } else if (l1 != null) {
+ destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
+ } else if (l2 != null) {
+ destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
+ }
+ }
+
+ if (destination != null) {
+ RegionPlan plan = new RegionPlan(region, currentServer, destination);
+ plans.add(plan);
+ }
+ }
+ }
+ return plans;
}
@Override
@@ -168,8 +238,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
for (ServerName s : favoredNodes) {
ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
if (serverWithLegitStartCode != null) {
- FavoredNodes.Position position =
- FavoredNodes.getFavoredServerPosition(favoredNodes, s);
+ FavoredNodesPlan.Position position =
+ FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
if (Position.PRIMARY.equals(position)) {
primaryHost = serverWithLegitStartCode;
} else if (Position.SECONDARY.equals(position)) {
@@ -243,7 +313,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
Map> assignmentMap,
- List regions, List servers) throws IOException {
+ List regions, List servers) {
Map primaryRSMap = new HashMap();
// figure the primary RSs
assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
@@ -274,12 +344,4 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion);
}
}
-
- void noteFavoredNodes(final Map favoredNodesMap) {
- for (Map.Entry entry : favoredNodesMap.entrySet()) {
- // the META should already have favorednode ServerName objects without startcode
- globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(entry.getKey(),
- Arrays.asList(entry.getValue()));
- }
- }
}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodes.java
deleted file mode 100644
index 7acb863..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodes.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright 2012 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.balancer;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.jboss.netty.util.internal.ConcurrentHashMap;
-
-/**
- * This class contains the mapping information between each region and
- * its favored region server list. Used by {@link FavoredNodeLoadBalancer} set
- * of classes and from unit tests (hence the class is public)
- *
- * All the access to this class is thread-safe.
- */
-@InterfaceAudience.Private
-public class FavoredNodes {
- protected static final Log LOG = LogFactory.getLog(
- FavoredNodes.class.getName());
-
- /** the map between each region and its favored region server list */
- private Map> favoredNodesMap;
-
- public static enum Position {
- PRIMARY,
- SECONDARY,
- TERTIARY;
- };
-
- public FavoredNodes() {
- favoredNodesMap = new ConcurrentHashMap>();
- }
-
- /**
- * Add an assignment to the plan
- * @param region
- * @param servers
- */
- public synchronized void updateFavoredNodesMap(HRegionInfo region,
- List servers) {
- if (region == null || servers == null || servers.size() ==0)
- return;
- this.favoredNodesMap.put(region, servers);
- }
-
- /**
- * @param region
- * @return the list of favored region server for this region based on the plan
- */
- public synchronized List getFavoredNodes(HRegionInfo region) {
- return favoredNodesMap.get(region);
- }
-
- /**
- * Return the position of the server in the favoredNodes list. Assumes the
- * favoredNodes list is of size 3.
- * @param favoredNodes
- * @param server
- * @return position
- */
- static Position getFavoredServerPosition(
- List favoredNodes, ServerName server) {
- if (favoredNodes == null || server == null ||
- favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
- return null;
- }
- for (Position p : Position.values()) {
- if (favoredNodes.get(p.ordinal()).equals(server)) {
- return p;
- }
- }
- return null;
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPlan.java
new file mode 100644
index 0000000..cd83170
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPlan.java
@@ -0,0 +1,150 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+
+/**
+ * This class contains the mapping information between each region and
+ * its favored region server list. Used by {@link FavoredNodeLoadBalancer} set
+ * of classes and from unit tests (hence the class is public)
+ *
+ * All the access to this class is thread-safe.
+ */
+@InterfaceAudience.Private
+public class FavoredNodesPlan {
+ protected static final Log LOG = LogFactory.getLog(
+ FavoredNodesPlan.class.getName());
+
+ /** the map between each region and its favored region server list */
+ private Map> favoredNodesMap;
+
+ public static enum Position {
+ PRIMARY,
+ SECONDARY,
+ TERTIARY;
+ };
+
+ public FavoredNodesPlan() {
+ favoredNodesMap = new ConcurrentHashMap>();
+ }
+
+ /**
+ * Add an assignment to the plan
+ * @param region
+ * @param servers
+ */
+ public synchronized void updateFavoredNodesMap(HRegionInfo region,
+ List servers) {
+ if (region == null || servers == null || servers.size() ==0)
+ return;
+ this.favoredNodesMap.put(region, servers);
+ }
+
+ /**
+ * @param region
+ * @return the list of favored region server for this region based on the plan
+ */
+ public synchronized List getFavoredNodes(HRegionInfo region) {
+ return favoredNodesMap.get(region);
+ }
+
+ /**
+ * Return the position of the server in the favoredNodes list. Assumes the
+ * favoredNodes list is of size 3.
+ * @param favoredNodes
+ * @param server
+ * @return position
+ */
+ public static Position getFavoredServerPosition(
+ List favoredNodes, ServerName server) {
+ if (favoredNodes == null || server == null ||
+ favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
+ return null;
+ }
+ for (Position p : Position.values()) {
+ if (favoredNodes.get(p.ordinal()).equals(server)) {
+ return p;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return the mapping between each region to its favored region server list
+ */
+ public synchronized Map> getAssignmentMap() {
+ return this.favoredNodesMap;
+ }
+
+ /**
+ * Add an assignment to the plan
+ * @param region
+ * @param servers
+ */
+ public synchronized void updateAssignmentPlan(HRegionInfo region,
+ List servers) {
+ if (region == null || servers == null || servers.size() ==0)
+ return;
+ this.favoredNodesMap.put(region, servers);
+ LOG.info("Update the assignment plan for region " +
+ region.getRegionNameAsString() + " ; favored nodes " +
+ FavoredNodeAssignmentHelper.getFavoredNodesAsString(servers));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (getClass() != o.getClass()) {
+ return false;
+ }
+ // To compare the map from objec o is identical to current assignment map.
+ Map> comparedMap=
+ ((FavoredNodesPlan)o).getAssignmentMap();
+
+ // compare the size
+ if (comparedMap.size() != this.favoredNodesMap.size())
+ return false;
+
+ // compare each element in the assignment map
+ for (Map.Entry> entry :
+ comparedMap.entrySet()) {
+ List serverList = this.favoredNodesMap.get(entry.getKey());
+ if (serverList == null && entry.getValue() != null) {
+ return false;
+ } else if (serverList != null && !serverList.equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b44fbb0..ef6a0ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -4341,4 +4342,17 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
return result;
}
+
+ @Override
+ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+ OpenRegionRequest request) throws ServiceException {
+ List openInfoList = request.getOpenInfoList();
+ UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
+ for (RegionOpenInfo regionOpenInfo : openInfoList) {
+ HRegionInfo hri = HRegionInfo.convert(regionOpenInfo.getRegion());
+ updateRegionFavoredNodesMapping(hri.getEncodedName(), regionOpenInfo.getFavoredNodesList());
+ }
+ respBuilder.setResponse(openInfoList.size());
+ return respBuilder.build();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
new file mode 100644
index 0000000..4c0cc46
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Thread that walks over the filesystem, and computes the mappings
+ * BestHost> and Map>
+ *
+ */
+@InterfaceAudience.Private
+class FSRegionScanner implements Runnable {
+ static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
+
+ private Path regionPath;
+
+ /**
+ * The file system used
+ */
+ private FileSystem fs;
+
+ /**
+ * Maps each region to the RS with highest locality for that region.
+ */
+ private Map regionToBestLocalityRSMapping;
+
+ /**
+ * Maps region encoded names to maps of hostnames to fractional locality of
+ * that region on that host.
+ */
+ private Map> regionDegreeLocalityMapping;
+
+ FSRegionScanner(FileSystem fs, Path regionPath,
+ Map regionToBestLocalityRSMapping,
+ Map