commit fb4670e0765dee6e2dbe30be9cb91f509083d0ea Author: stack Date: Wed Jul 1 17:48:36 2015 -0700 HBASE-13895 DATALOSS: Region assigned before WAL replay when abort (Enis Soztutar) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java new file mode 100644 index 0000000..ddc2270 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown by the region server when it is aborting. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegionServerAbortedException extends RegionServerStoppedException { + public RegionServerAbortedException(String s) { + super(s); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java index f116869..95f697e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Thrown by the region server when it is in shutting down state. + * @see RegionServerAbortedException */ @SuppressWarnings("serial") @InterfaceAudience.Public diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index c92393f..e66e99e 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -20,10 +20,18 @@ package org.apache.hadoop.hbase.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; import java.util.Random; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -31,8 +39,11 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -42,11 +53,13 @@ import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -56,8 +69,10 @@ import org.apache.hadoop.hbase.mapreduce.NMapInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -66,6 +81,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -91,6 +107,9 @@ import com.google.common.collect.Sets; */ @Category(IntegrationTests.class) public class IntegrationTestLoadAndVerify extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class); + private static final String TEST_NAME = "IntegrationTestLoadAndVerify"; private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1"); @@ -112,7 +131,10 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase { private static final int SCANNER_CACHING = 500; + private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters + private String toRun = null; + private String keysDir = null; private enum Counters { ROWS_WRITTEN, @@ -267,7 +289,6 @@ public void cleanUpCluster() throws Exception { } public static class VerifyReducer extends Reducer { - private static final Log LOG = LogFactory.getLog(VerifyReducer.class); private Counter refsChecked; private Counter rowsWritten; @@ -312,6 +333,7 @@ public void cleanUpCluster() throws Exception { protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "load-output"); + LOG.info("Load output dir: " + outputDir); NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); @@ -339,6 +361,7 @@ public void cleanUpCluster() throws Exception { protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "verify-output"); + LOG.info("Verify output dir: " + outputDir); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); @@ -363,6 +386,139 @@ public void cleanUpCluster() throws Exception { assertEquals(0, numOutputRecords); } + /** + * Tool to search missing rows in WALs and hfiles. + * Pass in file or dir of keys to search for. Key file must have been written by Verify step + * (we depend on the format it writes out. We'll read them in and then search in hbase + * WALs and oldWALs dirs (Some of this is TODO). + */ + public static class WALSearcher extends WALPlayer { + public WALSearcher(Configuration conf) { + super(conf); + } + + /** + * The actual searcher mapper. + */ + public static class WALMapperSearcher extends WALMapper { + private SortedSet keysToFind; + private AtomicInteger rows = new AtomicInteger(0); + + @Override + public void setup(Context context) + throws IOException { + super.setup(context); + try { + this.keysToFind = readKeysToSearch(context.getConfiguration()); + LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + } + + @Override + protected boolean filter(Context context, Cell cell) { + // TODO: Can I do a better compare than this copying out key? + byte [] row = new byte [cell.getRowLength()]; + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); + boolean b = this.keysToFind.contains(row); + if (b) { + String keyStr = Bytes.toStringBinary(row); + try { + LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); + } catch (IOException|InterruptedException e) { + LOG.warn(e); + } + if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + } + context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); + } + return b; + } + } + + // Put in place the above WALMapperSearcher. + @Override + public Job createSubmittableJob(String[] args) throws IOException { + Job job = super.createSubmittableJob(args); + // Call my class instead. + job.setJarByClass(WALMapperSearcher.class); + job.setMapperClass(WALMapperSearcher.class); + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } + } + + static final String FOUND_GROUP_KEY = "Found"; + static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; + + static SortedSet readKeysToSearch(final Configuration conf) + throws IOException, InterruptedException { + Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); + FileSystem fs = FileSystem.get(conf); + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + if (!fs.exists(keysInputDir)) { + throw new FileNotFoundException(keysInputDir.toString()); + } + if (!fs.isDirectory(keysInputDir)) { + FileStatus keyFileStatus = fs.getFileStatus(keysInputDir); + readFileToSearch(conf, fs, keyFileStatus, result); + } else { + RemoteIterator iterator = fs.listFiles(keysInputDir, false); + while(iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + // Skip "_SUCCESS" file. + if (keyFileStatus.getPath().getName().startsWith("_")) continue; + readFileToSearch(conf, fs, keyFileStatus, result); + } + } + return result; + } + + private static SortedSet readFileToSearch(final Configuration conf, + final FileSystem fs, final FileStatus keyFileStatus, SortedSet result) + throws IOException, + InterruptedException { + // verify uses file output format and writes . We can read it as a text file + try (InputStream in = fs.open(keyFileStatus.getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + // extract out the key and return that missing as a missing key + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) continue; + + String[] parts = line.split("\\s+"); + if (parts.length >= 1) { + String key = parts[0]; + result.add(Bytes.toBytesBinary(key)); + } else { + LOG.info("Cannot parse key from: " + line); + } + } + } + return result; + } + + private int doSearch(Configuration conf, String keysDir) throws Exception { + Path inputDir = new Path(keysDir); + + getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); + SortedSet keys = readKeysToSearch(getConf()); + if (keys.isEmpty()) throw new RuntimeException("No keys to find"); + LOG.info("Count of keys to find: " + keys.size()); + for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); + Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); + // Now read all WALs. In two dirs. Presumes certain layout. + Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + LOG.info("Running Search with keys inputDir=" + inputDir + + " against " + getConf().get(HConstants.HBASE_DIR)); + int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); + if (ret != 0) return ret; + return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""}); + } + private static void setJobScannerConf(Job job) { // Make sure scanners log something useful to make debugging possible. job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); @@ -371,11 +527,8 @@ public void cleanUpCluster() throws Exception { } public Path getTestDir(String testName, String subdir) throws IOException { - //HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported. + Path testDir = util.getDataTestDirOnTestFS(testName); FileSystem fs = FileSystem.get(getConf()); - Path base = new Path(fs.getWorkingDirectory(), "test-data"); - String randomStr = UUID.randomUUID().toString(); - Path testDir = new Path(base, randomStr); fs.deleteOnExit(testDir); return new Path(new Path(testDir, testName), subdir); @@ -398,7 +551,8 @@ public void cleanUpCluster() throws Exception { } public void usage() { - System.err.println(this.getClass().getSimpleName() + " [-Doptions] "); + System.err.println(this.getClass().getSimpleName() + + " [-Doptions] "); System.err.println(" Loads a table with row dependencies and verifies the dependency chains"); System.err.println("Options"); System.err.println(" -Dloadmapper.table= Table to write/verify (default autogen)"); @@ -417,11 +571,16 @@ public void cleanUpCluster() throws Exception { super.processOptions(cmd); String[] args = cmd.getArgs(); - if (args == null || args.length < 1 || args.length > 1) { + if (args == null || args.length < 1) { usage(); throw new RuntimeException("Incorrect Number of args."); } toRun = args[0]; + if (toRun.equalsIgnoreCase("search")) { + if (args.length > 1) { + keysDir = args[1]; + } + } } @Override @@ -429,16 +588,25 @@ public void cleanUpCluster() throws Exception { IntegrationTestingUtility.setUseDistributedCluster(getConf()); boolean doLoad = false; boolean doVerify = false; + boolean doSearch = false; boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true); int numPresplits = getConf().getInt("loadmapper.numPresplits", 40); - if (toRun.equals("load")) { + if (toRun.equalsIgnoreCase("load")) { doLoad = true; - } else if (toRun.equals("verify")) { + } else if (toRun.equalsIgnoreCase("verify")) { doVerify= true; - } else if (toRun.equals("loadAndVerify")) { + } else if (toRun.equalsIgnoreCase("loadAndVerify")) { doLoad=true; doVerify= true; + } else if (toRun.equalsIgnoreCase("search")) { + doLoad=false; + doVerify= false; + doSearch = true; + if (keysDir == null) { + System.err.println("Usage: search ]"); + return 1; + } } else { System.err.println("Invalid argument " + toRun); usage(); @@ -450,9 +618,9 @@ public void cleanUpCluster() throws Exception { HTableDescriptor htd = new HTableDescriptor(table); htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); - try (Connection conn = ConnectionFactory.createConnection(getConf()); - Admin admin = conn.getAdmin()) { - if (doLoad) { + if (doLoad) { + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); doLoad(getConf(), htd); } @@ -463,6 +631,9 @@ public void cleanUpCluster() throws Exception { getTestingUtil(getConf()).deleteTable(htd.getTableName()); } } + if (doSearch) { + return doSearch(getConf(), keysDir); + } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 713ca40..d7194f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -86,6 +86,13 @@ public class WALPlayer extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; /** + * @param conf The {@link Configuration} to use. + */ + public WALPlayer(Configuration conf) { + super(conf); + } + + /** * A mapper that just writes out KeyValues. * This one can be used together with {@link KeyValueSortReducer} */ @@ -327,7 +334,7 @@ public class WALPlayer extends Configured implements Tool { * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(HBaseConfiguration.create(), new WALPlayer(), args); + int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); System.exit(ret); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 34db4e4..dcbce23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.RegionStateListener; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1320,7 +1321,7 @@ public class AssignmentManager { if (state == null || state.getServerName() == null) { // We don't know where the region is, offline it. // No need to send CLOSE RPC - LOG.warn("Attempting to unassign a region not in RegionStates" + LOG.warn("Attempting to unassign a region not in RegionStates " + region.getRegionNameAsString() + ", offlined"); regionOffline(region); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7bcf8e7..d7be4b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1036,9 +1036,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException */ protected void checkOpen() throws IOException { - if (regionServer.isStopped() || regionServer.isAborted()) { - throw new RegionServerStoppedException("Server " + regionServer.serverName - + " not running" + (regionServer.isAborted() ? ", aborting" : "")); + if (regionServer.isAborted()) { + throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); + } + if (regionServer.isStopped()) { + throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); } if (!regionServer.fsOk) { throw new RegionServerStoppedException("File system not available"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index e524f38..6dc1d9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -114,7 +114,7 @@ public class TestWALPlayer { .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); Configuration configuration= TEST_UTIL.getConfiguration(); - WALPlayer player = new WALPlayer(); + WALPlayer player = new WALPlayer(configuration); String optionName="_test_.name"; configuration.set(optionName, "1000"); player.setupTime(configuration, optionName);