diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java new file mode 100644 index 0000000..ddc2270 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown by the region server when it is aborting. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegionServerAbortedException extends RegionServerStoppedException { + public RegionServerAbortedException(String s) { + super(s); + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java index f116869..95f697e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Thrown by the region server when it is in shutting down state. + * @see RegionServerAbortedException */ @SuppressWarnings("serial") @InterfaceAudience.Public diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index 8a0181c..9ecb9ca 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -26,8 +26,11 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -37,23 +40,26 @@ import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.NMapInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -62,14 +68,22 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; import java.util.Random; import java.util.Set; -import java.util.UUID; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -92,6 +106,9 @@ import java.util.regex.Pattern; */ @Category(IntegrationTests.class) public class IntegrationTestLoadAndVerify extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class); + private static final String TEST_NAME = "IntegrationTestLoadAndVerify"; private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1"); @@ -113,7 +130,10 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase { private static final int SCANNER_CACHING = 500; + private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters + private String toRun = null; + private String keysDir = null; private enum Counters { ROWS_WRITTEN, @@ -268,7 +288,6 @@ public void cleanUpCluster() throws Exception { } public static class VerifyReducer extends Reducer { - private static final Log LOG = LogFactory.getLog(VerifyReducer.class); private Counter refsChecked; private Counter rowsWritten; @@ -313,6 +332,7 @@ public void cleanUpCluster() throws Exception { protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "load-output"); + LOG.info("Load output dir: " + outputDir); NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); @@ -340,6 +360,7 @@ public void cleanUpCluster() throws Exception { protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "verify-output"); + LOG.info("Verify output dir: " + outputDir); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); @@ -364,6 +385,139 @@ public void cleanUpCluster() throws Exception { assertEquals(0, numOutputRecords); } + /** + * Tool to search missing rows in WALs and hfiles. + * Pass in file or dir of keys to search for. Key file must have been written by Verify step + * (we depend on the format it writes out. We'll read them in and then search in hbase + * WALs and oldWALs dirs (Some of this is TODO). + */ + public static class WALSearcher extends WALPlayer { + public WALSearcher(Configuration conf) { + super(conf); + } + + /** + * The actual searcher mapper. + */ + public static class WALMapperSearcher extends WALMapper { + private SortedSet keysToFind; + private AtomicInteger rows = new AtomicInteger(0); + + @Override + public void setup(Mapper.Context context) + throws IOException { + super.setup(context); + try { + this.keysToFind = readKeysToSearch(context.getConfiguration()); + LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + } + + @Override + protected boolean filter(Context context, Cell cell) { + // TODO: Can I do a better compare than this copying out key? + byte [] row = new byte [cell.getRowLength()]; + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); + boolean b = this.keysToFind.contains(row); + if (b) { + String keyStr = Bytes.toStringBinary(row); + try { + LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); + } catch (IOException|InterruptedException e) { + LOG.warn(e); + } + if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + } + context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); + } + return b; + } + } + + // Put in place the above WALMapperSearcher. + @Override + public Job createSubmittableJob(String[] args) throws IOException { + Job job = super.createSubmittableJob(args); + // Call my class instead. + job.setJarByClass(WALMapperSearcher.class); + job.setMapperClass(WALMapperSearcher.class); + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } + } + + static final String FOUND_GROUP_KEY = "Found"; + static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; + + static SortedSet readKeysToSearch(final Configuration conf) + throws IOException, InterruptedException { + Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); + FileSystem fs = FileSystem.get(conf); + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + if (!fs.exists(keysInputDir)) { + throw new FileNotFoundException(keysInputDir.toString()); + } + if (!fs.isDirectory(keysInputDir)) { + FileStatus keyFileStatus = fs.getFileStatus(keysInputDir); + readFileToSearch(conf, fs, keyFileStatus, result); + } else { + RemoteIterator iterator = fs.listFiles(keysInputDir, false); + while(iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + // Skip "_SUCCESS" file. + if (keyFileStatus.getPath().getName().startsWith("_")) continue; + readFileToSearch(conf, fs, keyFileStatus, result); + } + } + return result; + } + + private static SortedSet readFileToSearch(final Configuration conf, + final FileSystem fs, final FileStatus keyFileStatus, SortedSet result) + throws IOException, + InterruptedException { + // verify uses file output format and writes . We can read it as a text file + try (InputStream in = fs.open(keyFileStatus.getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + // extract out the key and return that missing as a missing key + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) continue; + + String[] parts = line.split("\\s+"); + if (parts.length >= 1) { + String key = parts[0]; + result.add(Bytes.toBytesBinary(key)); + } else { + LOG.info("Cannot parse key from: " + line); + } + } + } + return result; + } + + private int doSearch(Configuration conf, String keysDir) throws Exception { + Path inputDir = new Path(keysDir); + + getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); + SortedSet keys = readKeysToSearch(getConf()); + if (keys.isEmpty()) throw new RuntimeException("No keys to find"); + LOG.info("Count of keys to find: " + keys.size()); + for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); + Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); + // Now read all WALs. In two dirs. Presumes certain layout. + Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + LOG.info("Running Search with keys inputDir=" + inputDir + + " against " + getConf().get(HConstants.HBASE_DIR)); + int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); + if (ret != 0) return ret; + return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""}); + } + private static void setJobScannerConf(Job job) { // Make sure scanners log something useful to make debugging possible. job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); @@ -372,11 +526,8 @@ public void cleanUpCluster() throws Exception { } public Path getTestDir(String testName, String subdir) throws IOException { - //HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported. + Path testDir = util.getDataTestDirOnTestFS(testName); FileSystem fs = FileSystem.get(getConf()); - Path base = new Path(fs.getWorkingDirectory(), "test-data"); - String randomStr = UUID.randomUUID().toString(); - Path testDir = new Path(base, randomStr); fs.deleteOnExit(testDir); return new Path(new Path(testDir, testName), subdir); @@ -399,7 +550,8 @@ public void cleanUpCluster() throws Exception { } public void usage() { - System.err.println(this.getClass().getSimpleName() + " [-Doptions] "); + System.err.println(this.getClass().getSimpleName() + + " [-Doptions] "); System.err.println(" Loads a table with row dependencies and verifies the dependency chains"); System.err.println("Options"); System.err.println(" -Dloadmapper.table= Table to write/verify (default autogen)"); @@ -418,11 +570,16 @@ public void cleanUpCluster() throws Exception { super.processOptions(cmd); String[] args = cmd.getArgs(); - if (args == null || args.length < 1 || args.length > 1) { + if (args == null || args.length < 1) { usage(); throw new RuntimeException("Incorrect Number of args."); } toRun = args[0]; + if (toRun.equalsIgnoreCase("search")) { + if (args.length > 1) { + keysDir = args[1]; + } + } } @Override @@ -430,16 +587,25 @@ public void cleanUpCluster() throws Exception { IntegrationTestingUtility.setUseDistributedCluster(getConf()); boolean doLoad = false; boolean doVerify = false; + boolean doSearch = false; boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true); int numPresplits = getConf().getInt("loadmapper.numPresplits", 40); - if (toRun.equals("load")) { + if (toRun.equalsIgnoreCase("load")) { doLoad = true; - } else if (toRun.equals("verify")) { + } else if (toRun.equalsIgnoreCase("verify")) { doVerify= true; - } else if (toRun.equals("loadAndVerify")) { + } else if (toRun.equalsIgnoreCase("loadAndVerify")) { doLoad=true; doVerify= true; + } else if (toRun.equalsIgnoreCase("search")) { + doLoad=false; + doVerify= false; + doSearch = true; + if (keysDir == null) { + System.err.println("Usage: search ]"); + return 1; + } } else { System.err.println("Invalid argument " + toRun); usage(); @@ -451,9 +617,9 @@ public void cleanUpCluster() throws Exception { HTableDescriptor htd = new HTableDescriptor(table); htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); - try (Connection conn = ConnectionFactory.createConnection(getConf()); - Admin admin = conn.getAdmin()) { - if (doLoad) { + if (doLoad) { + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); doLoad(getConf(), htd); } @@ -464,6 +630,9 @@ public void cleanUpCluster() throws Exception { getTestingUtil(getConf()).deleteTable(htd.getTableName()); } } + if (doSearch) { + return doSearch(getConf(), keysDir); + } return 0; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index ebc3d01..1d30a00 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.quotas.RegionStateListener; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1866,11 +1867,19 @@ public class AssignmentManager extends ZooKeeperListener { LOG.warn("Server " + server + " region CLOSE RPC returned false for " + region.getRegionNameAsString()); } catch (Throwable t) { + long sleepTime = 0; + Configuration conf = this.server.getConfiguration(); if (t instanceof RemoteException) { t = ((RemoteException)t).unwrapRemoteException(); } boolean logRetries = true; - if (t instanceof NotServingRegionException + if (t instanceof RegionServerAbortedException) { + // RS is aborting, we cannot offline the region since the region may need to do WAL + // recovery. Until we see the RS expiration, we should retry. + sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + + } else if (t instanceof NotServingRegionException || t instanceof RegionServerStoppedException || t instanceof ServerNotRunningYetException) { LOG.debug("Offline " + region.getRegionNameAsString() @@ -1884,8 +1893,6 @@ public class AssignmentManager extends ZooKeeperListener { return; } else if ((t instanceof FailedServerException) || (state != null && t instanceof RegionAlreadyInTransitionException)) { - long sleepTime = 0; - Configuration conf = this.server.getConfiguration(); if(t instanceof FailedServerException) { sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); @@ -1908,19 +1915,20 @@ public class AssignmentManager extends ZooKeeperListener { logRetries = false; } } - try { - if (sleepTime > 0) { - Thread.sleep(sleepTime); - } - } catch (InterruptedException ie) { - LOG.warn("Failed to unassign " - + region.getRegionNameAsString() + " since interrupted", ie); - Thread.currentThread().interrupt(); - if (state != null) { - regionStates.updateRegionState(region, State.FAILED_CLOSE); - } - return; + } + + try { + if (sleepTime > 0) { + Thread.sleep(sleepTime); } + } catch (InterruptedException ie) { + LOG.warn("Failed to unassign " + + region.getRegionNameAsString() + " since interrupted", ie); + Thread.currentThread().interrupt(); + if (state != null) { + regionStates.updateRegionState(region, State.FAILED_CLOSE); + } + return; } if (logRetries) { @@ -2006,7 +2014,7 @@ public class AssignmentManager extends ZooKeeperListener { } @SuppressWarnings("deprecation") - private boolean wasRegionOnDeadServerByMeta( + protected boolean wasRegionOnDeadServerByMeta( final HRegionInfo region, final ServerName sn) { try { if (region.isMetaRegion()) { @@ -2493,7 +2501,7 @@ public class AssignmentManager extends ZooKeeperListener { if (state == null || state.getServerName() == null) { // We don't know where the region is, offline it. // No need to send CLOSE RPC - LOG.warn("Attempting to unassign a region not in RegionStates" + LOG.warn("Attempting to unassign a region not in RegionStates " + region.getRegionNameAsString() + ", offlined"); regionOffline(region); return; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 504ad03..c0a35cc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -956,7 +956,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Configuration getConfiguration() { return regionServer.getConfiguration(); } - + private RegionServerQuotaManager getQuotaManager() { return regionServer.getRegionServerQuotaManager(); } @@ -976,9 +976,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException */ protected void checkOpen() throws IOException { - if (regionServer.isStopped() || regionServer.isAborted()) { - throw new RegionServerStoppedException("Server " + regionServer.serverName - + " not running" + (regionServer.isAborted() ? ", aborting" : "")); + if (regionServer.isAborted()) { + throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); + } + if (regionServer.isStopped()) { + throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); } if (!regionServer.fsOk) { throw new RegionServerStoppedException("File system not available"); @@ -2438,7 +2440,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); } } - + quota.addScanResult(results); // If the scanner's filter - if any - is done with the scan diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 9e8097e..089bed5 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1514,4 +1515,79 @@ public class TestAssignmentManager { am.shutdown(); } } + + /** + * Tests close region call on a region server that is aborting + */ + @Test (timeout=180000) + public void testCloseRegionOnAbortingRS() throws Exception { + this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2); + + HRegionInfo hri = REGIONINFO; + LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer( + server.getConfiguration()); + // Create an AM. + AssignmentManager am = new AssignmentManager(this.server, + this.serverManager, balancer, null, null, master.getTableLockManager()); + RegionStates regionStates = am.getRegionStates(); + + regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B); + + // mock aborting region server + Mockito.when(this.serverManager.sendRegionClose(Mockito.eq(SERVERNAME_B), Mockito.eq(REGIONINFO), + Mockito.anyInt(), (ServerName)Mockito.any(), Mockito.anyBoolean())) + .thenThrow(new RegionServerAbortedException("")); + + // try to unassign the region + am.unassign(hri); + + // assert that the we have FAILED_CLOSE for region state + assertEquals(State.FAILED_CLOSE, regionStates.getRegionState(REGIONINFO).getState()); + assertEquals(SERVERNAME_B, regionStates.getRegionState(REGIONINFO).getServerName()); + + am.shutdown(); + } + + /** + * Tests close region call on a region server that is not in onlineServer list + */ + @Test (timeout=180000) + public void testCloseRegionOnServerNotOnline() throws Exception { + this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2); + + HRegionInfo hri = REGIONINFO; + LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer( + server.getConfiguration()); + // Create an AM. + AssignmentManager am = new AssignmentManager(this.server, + this.serverManager, balancer, null, null, master.getTableLockManager()) { + @Override + protected boolean wasRegionOnDeadServerByMeta(HRegionInfo region, ServerName sn) { + return true; + }; + }; + RegionStates regionStates = am.getRegionStates(); + + regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B); + + // mock that RS is expired, but not processed + Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)) + .thenReturn(false); + + // try to unassign the region + am.unassign(hri); + + // assert that the we have OFFLINE + assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState()); + + // try to assign the region before SSH + am.regionPlans.put(REGIONINFO.getEncodedName(), + new RegionPlan(REGIONINFO, null, SERVERNAME_A)); + am.assign(hri, true, false); + + // assert that the we still have OFFLINE + assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState()); + + am.shutdown(); + } }