From 2c242b5ae86df993fa54b6df6e6106820c1421d2 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 18 Jul 2019 09:41:11 +0800 Subject: [PATCH] HBASE-22527 [hbck2] Add a master web ui to show the problematic regions --- .../tmpl/master/AssignmentManagerStatusTmpl.jamon | 103 +++- .../hbase/master/assignment/AssignmentManager.java | 57 ++ .../assignment/TestAMProblematicRegions.java | 127 +++++ .../master/assignment/TestAssignmentManager.java | 599 +------------------- .../assignment/TestAssignmentManagerBase.java | 629 +++++++++++++++++++++ 5 files changed, 911 insertions(+), 604 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon index 9f31483..1d8fa70 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon @@ -17,27 +17,108 @@ See the License for the specific language governing permissions and limitations under the License. <%import> -org.apache.hadoop.hbase.master.assignment.AssignmentManager; -org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat; -org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen; -org.apache.hadoop.hbase.master.RegionState; +java.util.Map; +java.util.Set; +java.util.SortedSet; +java.util.concurrent.atomic.AtomicInteger; +java.util.stream.Collectors; org.apache.hadoop.conf.Configuration; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.HConstants; +org.apache.hadoop.hbase.ServerName; +org.apache.hadoop.hbase.client.RegionInfo; org.apache.hadoop.hbase.client.RegionInfoDisplay; -java.util.HashSet; -java.util.SortedSet; -java.util.Map; -java.util.concurrent.atomic.AtomicInteger; +org.apache.hadoop.hbase.master.RegionState; +org.apache.hadoop.hbase.master.assignment.AssignmentManager; +org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat; +org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen; +org.apache.hadoop.hbase.util.Pair; <%args> AssignmentManager assignmentManager; int limit = 100; -<%java SortedSet rit = assignmentManager - .getRegionStates().getRegionsInTransitionOrderedByTimestamp(); -%> +<%java> +SortedSet rit = assignmentManager.getRegionStates() + .getRegionsInTransitionOrderedByTimestamp(); +Map>> problematicRegions = assignmentManager + .getProblematicRegions(); + + +<%if !problematicRegions.isEmpty() %> +<%java> +int totalSize = problematicRegions.size(); +int sizePerPage = Math.min(10, totalSize); +int numOfPages = (int) Math.ceil(totalSize * 1.0 / sizePerPage); + +
+

Problematic Regions

+

+ + <% problematicRegions.size() %> problematic region(s). There are three case: 1. Master + thought this region opened, but no regionserver reported it. 2. Master thought this + region opened on Server1, but regionserver reported Server2. 3. More than one + regionservers reported opened this region. Notice: the reported online regionservers + may be not right when there are regions in transition. Please check them in + regionserver's web UI. + +

+
+
+ <%java int recordItr = 0; %> + <%for Map.Entry>> entry : problematicRegions.entrySet() %> + <%if (recordItr % sizePerPage) == 0 %> + <%if recordItr == 0 %> +
+ <%else> +
+ + + + + + + + + + + + + + + <%java recordItr++; %> + <%if (recordItr % sizePerPage) == 0 %> +
RegionLocation in METAReported Online Region Servers
<% entry.getKey() %><% entry.getValue().getFirst() %><% entry.getValue().getSecond().stream().map(ServerName::getServerName) + .collect(Collectors.joining(", ")) %>
+
+ + + + <%if (recordItr % sizePerPage) != 0 %> + <%for ; (recordItr % sizePerPage) != 0 ; recordItr++ %> + + + +
+ + +
+ +
+
+ <%if !rit.isEmpty() %> <%java> diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 69b552e..e680454 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -156,6 +157,8 @@ public class AssignmentManager implements ServerListener { private final RegionStates regionStates = new RegionStates(); private final RegionStateStore regionStateStore; + private final Map> rsReports = new HashMap<>(); + private final boolean shouldAssignRegionsWithFavoredNodes; private final int assignDispatchWaitQueueMaxSize; private final int assignDispatchWaitMillis; @@ -962,6 +965,11 @@ public class AssignmentManager implements ServerListener { } } + // Track the regionserver reported online regions in memory. + synchronized (rsReports) { + rsReports.put(serverName, regionNames); + } + if (regionNames.isEmpty()) { // nothing to do if we don't have regions LOG.trace("no online region found on " + serverName); @@ -1882,4 +1890,53 @@ public class AssignmentManager implements ServerListener { MasterServices getMaster() { return master; } + + /** + * Found the potentially problematic opened regions. There are three case: + * case 1. Master thought this region opened, but no regionserver reported it. + * case 2. Master thought this region opened on Server1, but regionserver reported Server2 + * case 3. More than one regionservers reported opened this region + * + * @return the map of potentially problematic opened regions. Key is the region name. Value is + * a pair of location in meta and the regionservers which reported opened this region. + */ + public Map>> getProblematicRegions() { + Map> reportedOnlineRegions = new HashMap<>(); + synchronized (rsReports) { + for (Map.Entry> entry : rsReports.entrySet()) { + for (byte[] regionName : entry.getValue()) { + reportedOnlineRegions + .computeIfAbsent(RegionInfo.getRegionNameAsString(regionName), r -> new HashSet<>()) + .add(entry.getKey()); + } + } + } + + Map>> problematicRegions = new HashMap<>(); + List rits = regionStates.getRegionsStateInTransition(); + for (RegionState regionState : regionStates.getRegionStates()) { + // Only consider the opened region and not in transition + if (!rits.contains(regionState) && regionState.isOpened()) { + String regionName = regionState.getRegion().getRegionNameAsString(); + ServerName serverName = regionState.getServerName(); + if (reportedOnlineRegions.containsKey(regionName)) { + Set reportedServers = reportedOnlineRegions.get(regionName); + if (reportedServers.contains(serverName)) { + if (reportedServers.size() > 1) { + // More than one regionserver reported opened this region + problematicRegions.put(regionName, new Pair<>(serverName, reportedServers)); + } + } else { + // Master thought this region opened on Server1, but regionserver reported Server2 + problematicRegions.put(regionName, new Pair<>(serverName, reportedServers)); + } + } else { + // Master thought this region opened, but no regionserver reported it. + problematicRegions.put(regionName, new Pair<>(serverName, new HashSet<>())); + } + } + } + + return problematicRegions; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java new file mode 100644 index 0000000..d07e129 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestAMProblematicRegions extends TestAssignmentManagerBase { + private static final Logger LOG = LoggerFactory.getLogger(TestAMProblematicRegions.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAMProblematicRegions.class); + + @Test + public void testForMeta() throws Exception { + byte[] metaRegionNameAsBytes = RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName(); + String metaRegionName = RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionNameAsString(); + List serverNames = master.getServerManager().getOnlineServersList(); + assertEquals(NSERVERS, serverNames.size()); + + Map>> problematicRegions = am.getProblematicRegions(); + + // Test for case1: Master thought this region opened, but no regionserver reported it. + assertTrue(problematicRegions.containsKey(metaRegionName)); + Pair> pair = problematicRegions.get(metaRegionName); + ServerName locationInMeta = pair.getFirst(); + Set reportedRegionServers = pair.getSecond(); + assertTrue(serverNames.contains(locationInMeta)); + assertEquals(0, reportedRegionServers.size()); + + // Reported right region location. Then not in problematic regions. + am.reportOnlineRegions(locationInMeta, Collections.singleton(metaRegionNameAsBytes)); + problematicRegions = am.getProblematicRegions(); + assertFalse(problematicRegions.containsKey(metaRegionName)); + } + + @Test + public void testForUserTable() throws Exception { + TableName tableName = TableName.valueOf("testForUserTable"); + RegionInfo hri = createRegionInfo(tableName, 1); + String regionName = hri.getRegionNameAsString(); + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + Future future = submitProcedure(am.createAssignProcedure(hri)); + waitOnFuture(future); + + List serverNames = master.getServerManager().getOnlineServersList(); + assertEquals(NSERVERS, serverNames.size()); + + // Test for case1: Master thought this region opened, but no regionserver reported it. + Map>> problematicRegions = am.getProblematicRegions(); + assertTrue(problematicRegions.containsKey(regionName)); + Pair> pair = problematicRegions.get(regionName); + ServerName locationInMeta = pair.getFirst(); + Set reportedRegionServers = pair.getSecond(); + assertTrue(serverNames.contains(locationInMeta)); + assertEquals(0, reportedRegionServers.size()); + + // Test for case2: Master thought this region opened on Server1, but regionserver reported + // Server2 + final ServerName tempLocationInMeta = locationInMeta; + final ServerName anotherServer = + serverNames.stream().filter(s -> !s.equals(tempLocationInMeta)).findFirst().get(); + am.reportOnlineRegions(anotherServer, Collections.singleton(hri.getRegionName())); + problematicRegions = am.getProblematicRegions(); + assertTrue(problematicRegions.containsKey(regionName)); + pair = problematicRegions.get(regionName); + locationInMeta = pair.getFirst(); + reportedRegionServers = pair.getSecond(); + assertEquals(1, reportedRegionServers.size()); + assertFalse(reportedRegionServers.contains(locationInMeta)); + assertTrue(reportedRegionServers.contains(anotherServer)); + + // Test for case3: More than one regionservers reported opened this region. + am.reportOnlineRegions(locationInMeta, Collections.singleton(hri.getRegionName())); + problematicRegions = am.getProblematicRegions(); + assertTrue(problematicRegions.containsKey(regionName)); + pair = problematicRegions.get(regionName); + locationInMeta = pair.getFirst(); + reportedRegionServers = pair.getSecond(); + assertEquals(2, reportedRegionServers.size()); + assertTrue(reportedRegionServers.contains(locationInMeta)); + assertTrue(reportedRegionServers.contains(anotherServer)); + + // Reported right region location. Then not in problematic regions. + am.reportOnlineRegions(anotherServer, Collections.EMPTY_SET); + problematicRegions = am.getProblematicRegions(); + assertFalse(problematicRegions.containsKey(regionName)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 86186e3..33aba1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -18,154 +18,47 @@ package org.apache.hadoop.hbase.master.assignment; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.SocketTimeoutException; -import java.util.List; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; -import org.apache.hadoop.hbase.ipc.CallTimeoutException; -import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; -import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.junit.After; -import org.junit.Before; import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; - @Category({MasterTests.class, LargeTests.class}) -public class TestAssignmentManager { +public class TestAssignmentManager extends TestAssignmentManagerBase { + private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAssignmentManager.class); - private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class); - - @Rule public TestName name = new TestName(); - @Rule public final ExpectedException exception = ExpectedException.none(); - - private static final int PROC_NTHREADS = 64; - private static final int NREGIONS = 1 * 1000; - private static final int NSERVERS = Math.max(1, NREGIONS / 100); - - private HBaseTestingUtility UTIL; - private MockRSProcedureDispatcher rsDispatcher; - private MockMasterServices master; - private AssignmentManager am; - private NavigableMap> regionsToRegionServers = - new ConcurrentSkipListMap>(); - // Simple executor to run some simple tasks. - private ScheduledExecutorService executor; - - private ProcedureMetrics assignProcMetrics; - private ProcedureMetrics unassignProcMetrics; - - private long assignSubmittedCount = 0; - private long assignFailedCount = 0; - private long unassignSubmittedCount = 0; - private long unassignFailedCount = 0; - - private void setupConfiguration(Configuration conf) throws Exception { - FSUtils.setRootDir(conf, UTIL.getDataTestDir()); - conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); - conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); - conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); - conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); - conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. - } - - @Before - public void setUp() throws Exception { - UTIL = new HBaseTestingUtility(); - this.executor = Executors.newSingleThreadScheduledExecutor(); - setupConfiguration(UTIL.getConfiguration()); - master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers); - rsDispatcher = new MockRSProcedureDispatcher(master); - master.start(NSERVERS, rsDispatcher); - am = master.getAssignmentManager(); - assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); - unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); - setUpMeta(); - } - - private void setUpMeta() throws Exception { - rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); - am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); - am.wakeMetaLoadedEvent(); - } - - @After - public void tearDown() throws Exception { - master.stop("tearDown"); - this.executor.shutdownNow(); - } - - @Test (expected=NullPointerException.class) + @Test(expected = NullPointerException.class) public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException { // Test what happens if we pass in null server. I'd expect it throws NPE. - if (this.am.waitServerReportEvent(null, null)) throw new UnexpectedStateException(); + if (this.am.waitServerReportEvent(null, null)) { + throw new UnexpectedStateException(); + } } @Test @@ -471,484 +364,4 @@ public class TestAssignmentManager { assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount()); assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount()); } - - - private Future submitProcedure(final Procedure proc) { - return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); - } - - private byte[] waitOnFuture(final Future future) throws Exception { - try { - return future.get(5, TimeUnit.SECONDS); - } catch (ExecutionException e) { - LOG.info("ExecutionException", e); - Exception ee = (Exception)e.getCause(); - if (ee instanceof InterruptedIOException) { - for (Procedure p: this.master.getMasterProcedureExecutor().getProcedures()) { - LOG.info(p.toStringDetails()); - } - } - throw (Exception)e.getCause(); - } - } - - // ============================================================================================ - // Helpers - // ============================================================================================ - private void bulkSubmit(final AssignProcedure[] procs) throws Exception { - final Thread[] threads = new Thread[PROC_NTHREADS]; - for (int i = 0; i < threads.length; ++i) { - final int threadId = i; - threads[i] = new Thread() { - @Override - public void run() { - TableName tableName = TableName.valueOf("table-" + threadId); - int n = (procs.length / threads.length); - int start = threadId * n; - int stop = start + n; - for (int j = start; j < stop; ++j) { - procs[j] = createAndSubmitAssign(tableName, j); - } - } - }; - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { - procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); - } - } - - private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { - RegionInfo hri = createRegionInfo(tableName, regionId); - AssignProcedure proc = am.createAssignProcedure(hri); - master.getMasterProcedureExecutor().submitProcedure(proc); - return proc; - } - - private RegionInfo createRegionInfo(final TableName tableName, final long regionId) { - return RegionInfoBuilder.newBuilder(tableName) - .setStartKey(Bytes.toBytes(regionId)) - .setEndKey(Bytes.toBytes(regionId + 1)) - .setSplit(false) - .setRegionId(0) - .build(); - } - - private void sendTransitionReport(final ServerName serverName, - final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, - final TransitionCode state) throws IOException { - ReportRegionStateTransitionRequest.Builder req = - ReportRegionStateTransitionRequest.newBuilder(); - req.setServer(ProtobufUtil.toServerName(serverName)); - req.addTransition(RegionStateTransition.newBuilder() - .addRegionInfo(regionInfo) - .setTransitionCode(state) - .setOpenSeqNum(1) - .build()); - am.reportRegionStateTransition(req.build()); - } - - private void doCrash(final ServerName serverName) { - this.am.submitServerCrash(serverName, false/*No WALs here*/); - } - - private void doRestart(final ServerName serverName) { - try { - this.master.restartRegionServer(serverName); - } catch (IOException e) { - LOG.warn("Can not restart RS with new startcode"); - } - } - - private class NoopRsExecutor implements MockRSExecutor { - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, - ExecuteProceduresRequest request) throws IOException { - if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req : request.getOpenRegionList()) { - for (RegionOpenInfo openReq : req.getOpenInfoList()) { - execOpenRegion(server, openReq); - } - } - } - if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req : request.getCloseRegionList()) { - execCloseRegion(server, req.getRegion().getValue().toByteArray()); - } - } - return ExecuteProceduresResponse.newBuilder().build(); - } - - protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) - throws IOException { - return null; - } - - protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) - throws IOException { - return null; - } - } - - private class GoodRsExecutor extends NoopRsExecutor { - @Override - protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) - throws IOException { - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); - // Concurrency? - // Now update the state of our cluster in regionsToRegionServers. - SortedSet regions = regionsToRegionServers.get(server); - if (regions == null) { - regions = new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - regionsToRegionServers.put(server, regions); - } - RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); - if (regions.contains(hri.getRegionName())) { - throw new UnsupportedOperationException(hri.getRegionNameAsString()); - } - regions.add(hri.getRegionName()); - return RegionOpeningState.OPENED; - } - - @Override - protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) - throws IOException { - RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); - return CloseRegionResponse.newBuilder().setClosed(true).build(); - } - } - - private static class ServerNotYetRunningRsExecutor implements MockRSExecutor { - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - throw new ServerNotRunningYetException("wait on server startup"); - } - } - - private static class FaultyRsExecutor implements MockRSExecutor { - private final IOException exception; - - public FaultyRsExecutor(final IOException exception) { - this.exception = exception; - } - - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - throw exception; - } - } - - private class SocketTimeoutRsExecutor extends GoodRsExecutor { - private final int maxSocketTimeoutRetries; - private final int maxServerRetries; - - private ServerName lastServer; - private int sockTimeoutRetries; - private int serverRetries; - - public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { - this.maxServerRetries = maxServerRetries; - this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; - } - - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - // SocketTimeoutException should be a temporary problem - // unless the server will be declared dead. - if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { - if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); - lastServer = server; - LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); - throw new SocketTimeoutException("simulate socket timeout"); - } else if (serverRetries++ < maxServerRetries) { - LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); - master.getServerManager().moveFromOnlineToDeadServers(server); - sockTimeoutRetries = 0; - throw new SocketTimeoutException("simulate socket timeout"); - } else { - return super.sendRequest(server, req); - } - } - } - - /** - * Takes open request and then returns nothing so acts like a RS that went zombie. - * No response (so proc is stuck/suspended on the Master and won't wake up.). We - * then send in a crash for this server after a few seconds; crash is supposed to - * take care of the suspended procedures. - */ - private class HangThenRSCrashExecutor extends GoodRsExecutor { - private int invocations; - - @Override - protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) - throws IOException { - if (this.invocations++ > 0) { - // Return w/o problem the second time through here. - return super.execOpenRegion(server, openReq); - } - // The procedure on master will just hang forever because nothing comes back - // from the RS in this case. - LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Sending in CRASH of " + server); - doCrash(server); - } - }, 1, TimeUnit.SECONDS); - return null; - } - } - - /** - * Takes open request and then returns nothing so acts like a RS that went zombie. - * No response (so proc is stuck/suspended on the Master and won't wake up.). - * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create - * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor - * will restart RS directly, situation for RS crashed when SCP is not enabled. - */ - private class HangThenRSRestartExecutor extends GoodRsExecutor { - private int invocations; - - @Override - protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) - throws IOException { - if (this.invocations++ > 0) { - // Return w/o problem the second time through here. - return super.execOpenRegion(server, openReq); - } - // The procedure on master will just hang forever because nothing comes back - // from the RS in this case. - LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Restarting RS of " + server); - doRestart(server); - } - }, 1, TimeUnit.SECONDS); - return null; - } - } - - private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { - public static final int TYPES_OF_FAILURE = 6; - private int invocations; - - @Override - protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) - throws IOException { - switch (this.invocations++) { - case 0: throw new NotServingRegionException("Fake"); - case 1: - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Sending in CRASH of " + server); - doCrash(server); - } - }, 1, TimeUnit.SECONDS); - throw new RegionServerAbortedException("Fake!"); - case 2: - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Sending in CRASH of " + server); - doCrash(server); - } - }, 1, TimeUnit.SECONDS); - throw new RegionServerStoppedException("Fake!"); - case 3: throw new ServerNotRunningYetException("Fake!"); - case 4: - LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Sending in CRASH of " + server); - doCrash(server); - } - }, 1, TimeUnit.SECONDS); - return null; - default: - return super.execCloseRegion(server, regionName); - } - } - } - - private class RandRsExecutor extends NoopRsExecutor { - private final Random rand = new Random(); - - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - switch (rand.nextInt(5)) { - case 0: throw new ServerNotRunningYetException("wait on server startup"); - case 1: throw new SocketTimeoutException("simulate socket timeout"); - case 2: throw new RemoteException("java.io.IOException", "unexpected exception"); - default: - // fall out - } - return super.sendRequest(server, req); - } - - @Override - protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) - throws IOException { - switch (rand.nextInt(6)) { - case 0: - LOG.info("Return OPENED response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); - return OpenRegionResponse.RegionOpeningState.OPENED; - case 1: - LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); - return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; - case 2: - LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); - sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN); - return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; - default: - // fall out - } - // The procedure on master will just hang forever because nothing comes back - // from the RS in this case. - LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds..."); - executor.schedule(new Runnable() { - @Override - public void run() { - LOG.info("Delayed CRASHING of " + server); - doCrash(server); - } - }, 5, TimeUnit.SECONDS); - return null; - } - - @Override - protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) - throws IOException { - CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); - boolean closed = rand.nextBoolean(); - if (closed) { - RegionInfo hri = am.getRegionInfo(regionName); - sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); - } - resp.setClosed(closed); - return resp.build(); - } - } - - protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { - - private boolean invoked = false; - - private ServerName lastServer; - - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - if (!invoked) { - lastServer = server; - invoked = true; - throw new CallQueueTooBigException("simulate queue full"); - } - // better select another server since the server is over loaded, but anyway, it is fine to - // still select the same server since it is not dead yet... - if (lastServer.equals(server)) { - LOG.warn("We still select the same server, which is not good."); - } - return super.sendRequest(server, req); - } - } - - protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { - - private final int queueFullTimes; - - private int retries; - - private ServerName lastServer; - - public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { - this.queueFullTimes = queueFullTimes; - } - - @Override - public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException { - retries++; - if (retries == 1) { - lastServer = server; - throw new CallTimeoutException("simulate call timeout"); - } - // should always retry on the same server - assertEquals(lastServer, server); - if (retries < queueFullTimes) { - throw new CallQueueTooBigException("simulate queue full"); - } - return super.sendRequest(server, req); - } - } - - private interface MockRSExecutor { - ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) - throws IOException; - } - - private class MockRSProcedureDispatcher extends RSProcedureDispatcher { - private MockRSExecutor mockRsExec; - - public MockRSProcedureDispatcher(final MasterServices master) { - super(master); - } - - public void setMockRsExecutor(final MockRSExecutor mockRsExec) { - this.mockRsExec = mockRsExec; - } - - @Override - protected void remoteDispatch(ServerName serverName, Set remoteProcedures) { - submitTask(new MockRemoteCall(serverName, remoteProcedures)); - } - - private class MockRemoteCall extends ExecuteProceduresRemoteCall { - public MockRemoteCall(final ServerName serverName, final Set operations) { - super(serverName, operations); - } - - @Override - public void dispatchOpenRequests(MasterProcedureEnv env, - List operations) { - request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); - } - - @Override - public void dispatchCloseRequests(MasterProcedureEnv env, - List operations) { - for (RegionCloseOperation op : operations) { - request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); - } - } - - @Override - protected ExecuteProceduresResponse sendRequest(final ServerName serverName, - final ExecuteProceduresRequest request) throws IOException { - return mockRsExec.sendRequest(serverName, request); - } - } - } - - private void collectAssignmentManagerMetrics() { - assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); - assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); - unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); - unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java new file mode 100644 index 0000000..17ea05a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -0,0 +1,629 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; + +/** + * Base class for AM test. + */ +public class TestAssignmentManagerBase { + private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); + + @Rule + public TestName name = new TestName(); + @Rule + public final ExpectedException exception = ExpectedException.none(); + + private static final int PROC_NTHREADS = 64; + protected static final int NREGIONS = 1 * 1000; + protected static final int NSERVERS = Math.max(1, NREGIONS / 100); + + protected HBaseTestingUtility UTIL; + protected MockRSProcedureDispatcher rsDispatcher; + protected MockMasterServices master; + protected AssignmentManager am; + protected NavigableMap> regionsToRegionServers = + new ConcurrentSkipListMap<>(); + // Simple executor to run some simple tasks. + protected ScheduledExecutorService executor; + + protected ProcedureMetrics assignProcMetrics; + protected ProcedureMetrics unassignProcMetrics; + + protected long assignSubmittedCount = 0; + protected long assignFailedCount = 0; + protected long unassignSubmittedCount = 0; + protected long unassignFailedCount = 0; + + protected void setupConfiguration(Configuration conf) throws Exception { + FSUtils.setRootDir(conf, UTIL.getDataTestDir()); + conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); + conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); + conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); + conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. + } + + @Before + public void setUp() throws Exception { + UTIL = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(); + setupConfiguration(UTIL.getConfiguration()); + master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + master.start(NSERVERS, rsDispatcher); + am = master.getAssignmentManager(); + assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); + unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); + setUpMeta(); + } + + private void setUpMeta() throws Exception { + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); + am.wakeMetaLoadedEvent(); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + protected Future submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + protected byte[] waitOnFuture(final Future future) throws Exception { + try { + return future.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + LOG.info("ExecutionException", e); + Exception ee = (Exception) e.getCause(); + if (ee instanceof InterruptedIOException) { + for (Procedure p : this.master.getMasterProcedureExecutor().getProcedures()) { + LOG.info(p.toStringDetails()); + } + } + throw (Exception) e.getCause(); + } + } + + // ============================================================================================ + // Helpers + // ============================================================================================ + protected void bulkSubmit(final AssignProcedure[] procs) throws Exception { + final Thread[] threads = new Thread[PROC_NTHREADS]; + for (int i = 0; i < threads.length; ++i) { + final int threadId = i; + threads[i] = new Thread() { + @Override + public void run() { + TableName tableName = TableName.valueOf("table-" + threadId); + int n = (procs.length / threads.length); + int start = threadId * n; + int stop = start + n; + for (int j = start; j < stop; ++j) { + procs[j] = createAndSubmitAssign(tableName, j); + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { + procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); + } + } + + private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { + RegionInfo hri = createRegionInfo(tableName, regionId); + AssignProcedure proc = am.createAssignProcedure(hri); + master.getMasterProcedureExecutor().submitProcedure(proc); + return proc; + } + + protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { + return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) + .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); + } + + private void sendTransitionReport(final ServerName serverName, + final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, + final RegionServerStatusProtos.RegionStateTransition.TransitionCode state) + throws IOException { + RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req = + RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder(); + req.setServer(ProtobufUtil.toServerName(serverName)); + req.addTransition( + RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo) + .setTransitionCode(state).setOpenSeqNum(1).build()); + am.reportRegionStateTransition(req.build()); + } + + private void doCrash(final ServerName serverName) { + this.am.submitServerCrash(serverName, false/*No WALs here*/); + } + + private void doRestart(final ServerName serverName) { + try { + this.master.restartRegionServer(serverName); + } catch (IOException e) { + LOG.warn("Can not restart RS with new startcode"); + } + } + + private class NoopRsExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest request) throws IOException { + if (request.getOpenRegionCount() > 0) { + for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) { + for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : req.getOpenInfoList()) { + execOpenRegion(server, openReq); + } + } + } + if (request.getCloseRegionCount() > 0) { + for (AdminProtos.CloseRegionRequest req : request.getCloseRegionList()) { + execCloseRegion(server, req.getRegion().getValue().toByteArray()); + } + } + return AdminProtos.ExecuteProceduresResponse.newBuilder().build(); + } + + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, + AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { + return null; + } + + protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + return null; + } + } + + protected class GoodRsExecutor extends NoopRsExecutor { + @Override + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, + AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws IOException { + sendTransitionReport(server, openReq.getRegion(), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); + // Concurrency? + // Now update the state of our cluster in regionsToRegionServers. + SortedSet regions = regionsToRegionServers.get(server); + if (regions == null) { + regions = new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + regionsToRegionServers.put(server, regions); + } + RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); + if (regions.contains(hri.getRegionName())) { + throw new UnsupportedOperationException(hri.getRegionNameAsString()); + } + regions.add(hri.getRegionName()); + return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED; + } + + @Override + protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + RegionInfo hri = am.getRegionInfo(regionName); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED); + return AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build(); + } + } + + protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + throw new ServerNotRunningYetException("wait on server startup"); + } + } + + protected static class FaultyRsExecutor implements MockRSExecutor { + private final IOException exception; + + public FaultyRsExecutor(final IOException exception) { + this.exception = exception; + } + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + throw exception; + } + } + + protected class SocketTimeoutRsExecutor extends GoodRsExecutor { + private final int maxSocketTimeoutRetries; + private final int maxServerRetries; + + private ServerName lastServer; + private int sockTimeoutRetries; + private int serverRetries; + + public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { + this.maxServerRetries = maxServerRetries; + this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; + } + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + // SocketTimeoutException should be a temporary problem + // unless the server will be declared dead. + if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { + if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); + lastServer = server; + LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); + throw new SocketTimeoutException("simulate socket timeout"); + } else if (serverRetries++ < maxServerRetries) { + LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); + master.getServerManager().moveFromOnlineToDeadServers(server); + sockTimeoutRetries = 0; + throw new SocketTimeoutException("simulate socket timeout"); + } else { + return super.sendRequest(server, req); + } + } + } + + /** + * Takes open request and then returns nothing so acts like a RS that went zombie. + * No response (so proc is stuck/suspended on the Master and won't wake up.). We + * then send in a crash for this server after a few seconds; crash is supposed to + * take care of the suspended procedures. + */ + protected class HangThenRSCrashExecutor extends GoodRsExecutor { + private int invocations; + + @Override + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( + final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) + throws IOException { + if (this.invocations++ > 0) { + // Return w/o problem the second time through here. + return super.execOpenRegion(server, openReq); + } + // The procedure on master will just hang forever because nothing comes back + // from the RS in this case. + LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + return null; + } + } + + /** + * Takes open request and then returns nothing so acts like a RS that went zombie. + * No response (so proc is stuck/suspended on the Master and won't wake up.). + * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create + * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor + * will restart RS directly, situation for RS crashed when SCP is not enabled. + */ + protected class HangThenRSRestartExecutor extends GoodRsExecutor { + private int invocations; + + @Override + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( + final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) + throws IOException { + if (this.invocations++ > 0) { + // Return w/o problem the second time through here. + return super.execOpenRegion(server, openReq); + } + // The procedure on master will just hang forever because nothing comes back + // from the RS in this case. + LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Restarting RS of " + server); + doRestart(server); + } + }, 1, TimeUnit.SECONDS); + return null; + } + } + + protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { + public static final int TYPES_OF_FAILURE = 6; + private int invocations; + + @Override + protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + switch (this.invocations++) { + case 0: + throw new NotServingRegionException("Fake"); + case 1: + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + throw new RegionServerAbortedException("Fake!"); + case 2: + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + throw new RegionServerStoppedException("Fake!"); + case 3: + throw new ServerNotRunningYetException("Fake!"); + case 4: + LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + return null; + default: + return super.execCloseRegion(server, regionName); + } + } + } + + protected class RandRsExecutor extends NoopRsExecutor { + private final Random rand = new Random(); + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + switch (rand.nextInt(5)) { + case 0: + throw new ServerNotRunningYetException("wait on server startup"); + case 1: + throw new SocketTimeoutException("simulate socket timeout"); + case 2: + throw new RemoteException("java.io.IOException", "unexpected exception"); + default: + // fall out + } + return super.sendRequest(server, req); + } + + @Override + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( + final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) + throws IOException { + switch (rand.nextInt(6)) { + case 0: + LOG.info("Return OPENED response"); + sendTransitionReport(server, openReq.getRegion(), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); + return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED; + case 1: + LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); + sendTransitionReport(server, openReq.getRegion(), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); + return AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; + case 2: + LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); + sendTransitionReport(server, openReq.getRegion(), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN); + return AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING; + default: + // fall out + } + // The procedure on master will just hang forever because nothing comes back + // from the RS in this case. + LOG.info( + "Return null as response; means proc stuck so we send in a crash report after a few seconds..."); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Delayed CRASHING of " + server); + doCrash(server); + } + }, 5, TimeUnit.SECONDS); + return null; + } + + @Override + protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + AdminProtos.CloseRegionResponse.Builder resp = AdminProtos.CloseRegionResponse.newBuilder(); + boolean closed = rand.nextBoolean(); + if (closed) { + RegionInfo hri = am.getRegionInfo(regionName); + sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), + RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED); + } + resp.setClosed(closed); + return resp.build(); + } + } + + protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { + + private boolean invoked = false; + + private ServerName lastServer; + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + if (!invoked) { + lastServer = server; + invoked = true; + throw new CallQueueTooBigException("simulate queue full"); + } + // better select another server since the server is over loaded, but anyway, it is fine to + // still select the same server since it is not dead yet... + if (lastServer.equals(server)) { + LOG.warn("We still select the same server, which is not good."); + } + return super.sendRequest(server, req); + } + } + + protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { + + private final int queueFullTimes; + + private int retries; + + private ServerName lastServer; + + public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { + this.queueFullTimes = queueFullTimes; + } + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + retries++; + if (retries == 1) { + lastServer = server; + throw new CallTimeoutException("simulate call timeout"); + } + // should always retry on the same server + assertEquals(lastServer, server); + if (retries < queueFullTimes) { + throw new CallQueueTooBigException("simulate queue full"); + } + return super.sendRequest(server, req); + } + } + + protected interface MockRSExecutor { + AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException; + } + + protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, Set remoteProcedures) { + submitTask(new MockRemoteCall(serverName, remoteProcedures)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, final Set operations) { + super(serverName, operations); + } + + @Override + public void dispatchOpenRequests(MasterProcedureEnv env, + List operations) { + request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); + } + + @Override + public void dispatchCloseRequests(MasterProcedureEnv env, + List operations) { + for (RegionCloseOperation op : operations) { + request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); + } + } + + @Override + protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } + + protected void collectAssignmentManagerMetrics() { + assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); + assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); + unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); + unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); + } +} -- 2.7.4