diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 4a3a64a..6e7cd33 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -20,15 +20,13 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterManager.ServiceType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -42,8 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.collect.Sets; - /** * Manages the interactions with an already deployed distributed cluster (as opposed to * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests. diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 931fba4..627b812 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -18,7 +18,23 @@ package org.apache.hadoop.hbase.test; -import com.google.common.collect.Sets; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -29,10 +45,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; @@ -44,9 +64,9 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -57,13 +77,15 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; @@ -76,28 +98,23 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Sets; /** * This is an integration test borrowed from goraci, written by Keith Turner, @@ -379,6 +396,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { current[i] = new byte[key.getLength()]; System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); if (++i == current.length) { + LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" + + Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) + + ", i=" + i); persist(output, count, prev, current, id); i = 0; @@ -470,8 +490,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { "pre-splitting table into " + totalNumberOfRegions + " regions " + "(default regions per server: " + regionsPerServer + ")"); - byte[][] splits = new RegionSplitter.UniformSplit().split( - totalNumberOfRegions); + byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); admin.createTable(htd, splits); } @@ -561,6 +580,156 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } /** + * Tool to search missing rows in WALs and hfiles. + * Pass in file or dir of keys to search for. Key file must have been written by Verify step + * (we depend on the format it writes out. We'll read them in and then search in hbase + * WALs and oldWALs dirs (Some of this is TODO). + */ + static class Search extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(Search.class); + protected Job job; + + private static void printUsage(final String error) { + if (error != null && error.length() > 0) System.out.println("ERROR: " + error); + System.err.println("Usage: search []"); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1 || args.length > 2) { + printUsage(null); + return 1; + } + Path inputDir = new Path(args[0]); + int numMappers = 1; + if (args.length > 1) { + numMappers = Integer.parseInt(args[1]); + } + return run(inputDir, numMappers); + } + + /** + * WALPlayer override that searches for keys loaded in the setup. + */ + public static class WALSearcher extends WALPlayer { + public WALSearcher(Configuration conf) { + super(conf); + } + + /** + * The actual searcher mapper. + */ + public static class WALMapperSearcher extends WALMapper { + private SortedSet keysToFind; + + @Override + public void setup(Mapper.Context context) + throws IOException { + super.setup(context); + try { + this.keysToFind = readKeysToSearch(context.getConfiguration()); + LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + } + + @Override + protected boolean filter(Context context, Cell cell) { + // TODO: Can I do a better compare than this copying out key? + byte [] row = new byte [cell.getRowLength()]; + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); + boolean b = this.keysToFind.contains(row); + if (b) { + String keyStr = Bytes.toStringBinary(row); + LOG.info("Found cell=" + cell); + context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + } + return b; + } + } + + // Put in place the above WALMapperSearcher. + @Override + public Job createSubmittableJob(String[] args) throws IOException { + Job job = super.createSubmittableJob(args); + // Call my class instead. + job.setJarByClass(WALMapperSearcher.class); + job.setMapperClass(WALMapperSearcher.class); + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } + } + + static final String FOUND_GROUP_KEY = "Found"; + static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; + + public int run(Path inputDir, int numMappers) throws Exception { + getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); + SortedSet keys = readKeysToSearch(getConf()); + if (keys.isEmpty()) throw new RuntimeException("No keys to find"); + LOG.info("Count of keys to find: " + keys.size()); + for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); + Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); + // Now read all WALs. In two dirs. Presumes certain layout. + Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + + " against " + getConf().get(HConstants.HBASE_DIR)); + int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); + if (ret != 0) return ret; + return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""}); + } + + static SortedSet readKeysToSearch(final Configuration conf) + throws IOException, InterruptedException { + Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); + FileSystem fs = FileSystem.get(conf); + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + if (!fs.exists(keysInputDir)) { + throw new FileNotFoundException(keysInputDir.toString()); + } + if (!fs.isDirectory(keysInputDir)) { + throw new UnsupportedOperationException("TODO"); + } else { + RemoteIterator iterator = fs.listFiles(keysInputDir, false); + while(iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + // Skip "_SUCCESS" file. + if (keyFileStatus.getPath().getName().startsWith("_")) continue; + result.addAll(readFileToSearch(conf, fs, keyFileStatus)); + } + } + return result; + } + + private static SortedSet readFileToSearch(final Configuration conf, + final FileSystem fs, final LocatedFileStatus keyFileStatus) + throws IOException, InterruptedException { + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is + // what is missing. + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = + new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { + InputSplit is = + new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); + rr.initialize(is, context); + while (rr.nextKeyValue()) { + rr.getCurrentKey(); + BytesWritable bw = rr.getCurrentValue(); + switch (Verify.VerifyReducer.whichType(bw.getBytes())) { + case UNDEFINED: + result.add(Verify.VerifyReducer.getRowOnly(bw)); + break; + } + } + } + return result; + } + } + + /** * A Map Reduce job that verifies that the linked lists generated by * {@link Generator} do not have any holes. */ @@ -591,21 +760,84 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } + /** + * Don't change the order of these enums. Their ordinals are used as type flag when we emit + * problems found from the reducer. + */ public static enum Counts { - UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES } - public static class VerifyReducer extends Reducer { + /** + * Per reducer, we output problem rows as byte arrasy so can be used as input for + * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag + * saying what sort of emission it is. Flag is the Count enum ordinal as a short. + */ + public static class VerifyReducer + extends Reducer { private ArrayList refs = new ArrayList(); + private final BytesWritable UNREF = + new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {})); private AtomicInteger rows = new AtomicInteger(0); + private Connection connection; + + @Override + protected void setup(Reducer.Context context) + throws IOException, InterruptedException { + super.setup(context); + this.connection = ConnectionFactory.createConnection(context.getConfiguration()); + } + + @Override + protected void cleanup(Reducer.Context context) + throws IOException, InterruptedException { + if (this.connection != null) this.connection.close(); + super.cleanup(context); + } + + /** + * @param ordinal + * @param r + * @return Return new byte array that has ordinal as prefix on front taking up + * Bytes.SIZEOF_SHORT bytes followed by r + */ + public static byte [] addPrefixFlag(final int ordinal, final byte [] r) { + byte [] prefix = Bytes.toBytes((short)ordinal); + if (prefix.length != Bytes.SIZEOF_SHORT) { + throw new RuntimeException("Unexpected size: " + prefix.length); + } + byte [] result = new byte [prefix.length + r.length]; + System.arraycopy(prefix, 0, result, 0, prefix.length); + System.arraycopy(r, 0, result, prefix.length, r.length); + return result; + } + + /** + * @param bs + * @return Type from the Counts enum of this row. Reads prefix added by + * {@link #addPrefixFlag(int, byte[])} + */ + public static Counts whichType(final byte [] bs) { + int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); + return Counts.values()[ordinal]; + } + + /** + * @param bw + * @return Row bytes minus the type flag. + */ + public static byte [] getRowOnly(BytesWritable bw) { + byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; + System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); + return bytes; + } @Override public void reduce(BytesWritable key, Iterable values, Context context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { int defCount = 0; - refs.clear(); for (BytesWritable type : values) { if (type.getLength() == DEF.getLength()) { @@ -618,48 +850,108 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } // TODO check for more than one def, should not happen - StringBuilder refsSb = null; - String keyString = null; + String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); if (defCount == 0 || refs.size() != 1) { - refsSb = new StringBuilder(); - String comma = ""; - for (byte[] ref : refs) { - refsSb.append(comma); - comma = ","; - refsSb.append(Bytes.toStringBinary(ref)); - } - keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); - - LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString()); + refsSb = dumpExtraInfoOnRefs(key, context, refs); + LOG.error("LinkedList error: key=" + keyString + ", reference(s)=" + + (refsSb != null? refsSb.toString(): "")); } if (defCount == 0 && refs.size() > 0) { - // this is bad, found a node that is referenced but not defined. It must have been + // This is bad, found a node that is referenced but not defined. It must have been // lost, emit some info about this node for debugging purposes. - context.write(new Text(keyString), new Text(refsSb.toString())); + // Write out a line per reference. If more than one, flag it.; + for (int i = 0; i < refs.size(); i++) { + byte [] bs = refs.get(i); + int ordinal; + if (i <= 0) { + ordinal = Counts.UNDEFINED.ordinal(); + context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); + } else { + ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); + context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); + } + } context.getCounter(Counts.UNDEFINED).increment(1); if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + // Print out missing row; doing get on reference gives info on when the referencer + // was added which can help a little debugging. This info is only available in mapper + // output -- the 'Linked List error Key...' log message above. What we emit here is + // useless for debugging. context.getCounter("undef", keyString).increment(1); } } else if (defCount > 0 && refs.size() == 0) { // node is defined but not referenced - context.write(new Text(keyString), new Text("none")); + context.write(key, UNREF); context.getCounter(Counts.UNREFERENCED).increment(1); if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { context.getCounter("unref", keyString).increment(1); } } else { if (refs.size() > 1) { - if (refsSb != null) { - context.write(new Text(keyString), new Text(refsSb.toString())); + // Skip first reference. + for (int i = 1; i < refs.size(); i++) { + context.write(key, + new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); } context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); } // node is defined and referenced context.getCounter(Counts.REFERENCED).increment(1); } + } + /** + * Dump out extra info around references if there are any. Helps debugging. + * @return StringBuilder filled with references if any. + * @throws IOException + */ + private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, + final List refs) + throws IOException { + StringBuilder refsSb = null; + if (refs.isEmpty()) return refsSb; + refsSb = new StringBuilder(); + String comma = ""; + // If a row is a reference but has no define, print the content of the row that has + // this row as a 'prev'; it will help debug. The missing row was written just before + // the row we are dumping out here. + TableName tn = getTableName(context.getConfiguration()); + try (Table t = this.connection.getTable(tn)) { + for (byte [] ref : refs) { + Result r = t.get(new Get(ref)); + List cells = r.listCells(); + String ts = (cells != null && !cells.isEmpty())? + new Date(cells.get(0).getTimestamp()).toString(): ""; + byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); + String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; + b = r.getValue(FAMILY_NAME, COLUMN_COUNT); + long count = (b != null && b.length > 0)? Bytes.toLong(b): -1; + b = r.getValue(FAMILY_NAME, COLUMN_PREV); + String refRegionLocation = ""; + String keyRegionLocation = ""; + if (b != null && b.length > 0) { + try (RegionLocator rl = this.connection.getRegionLocator(tn)) { + HRegionLocation hrl = rl.getRegionLocation(b); + if (hrl != null) refRegionLocation = hrl.toString(); + hrl = rl.getRegionLocation(key.getBytes()); + if (hrl != null) keyRegionLocation = hrl.toString(); + } + } + LOG.error("Extra info on reference without a define: ref=" + Bytes.toStringBinary(ref) + + ", refPrevEqualsKey=" + (Bytes.compareTo(key.getBytes(), b) == 0) + + ", key=" + Bytes.toStringBinary(key.getBytes()) + + ", ref row date=" + ts + ", jobStr=" + jobStr + + ", ref row count=" + count + + ", ref row regionLocation=" + refRegionLocation + + ", key row regionLocation=" + keyRegionLocation); + refsSb.append(comma); + comma = ","; + refsSb.append(Bytes.toStringBinary(ref)); + } + } + return refsSb; } } @@ -704,7 +996,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { job.getConfiguration().setBoolean("mapreduce.map.speculative", false); job.setReducerClass(VerifyReducer.class); - job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); TextOutputFormat.setOutputPath(job, outputDir); boolean success = job.waitForCompletion(true); @@ -753,23 +1047,26 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected void handleFailure(Counters counters) throws IOException { Configuration conf = job.getConfiguration(); - HConnection conn = HConnectionManager.getConnection(conf); TableName tableName = getTableName(conf); - CounterGroup g = counters.getGroup("undef"); - Iterator it = g.iterator(); - while (it.hasNext()) { - String keyString = it.next().getName(); - byte[] key = Bytes.toBytes(keyString); - HRegionLocation loc = conn.relocateRegion(tableName, key); - LOG.error("undefined row " + keyString + ", " + loc); - } - g = counters.getGroup("unref"); - it = g.iterator(); - while (it.hasNext()) { - String keyString = it.next().getName(); - byte[] key = Bytes.toBytes(keyString); - HRegionLocation loc = conn.relocateRegion(tableName, key); - LOG.error("unreferred row " + keyString + ", " + loc); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + try (RegionLocator rl = conn.getRegionLocator(tableName)) { + CounterGroup g = counters.getGroup("undef"); + Iterator it = g.iterator(); + while (it.hasNext()) { + String keyString = it.next().getName(); + byte[] key = Bytes.toBytes(keyString); + HRegionLocation loc = rl.getRegionLocation(key, true); + LOG.error("undefined row " + keyString + ", " + loc); + } + g = counters.getGroup("unref"); + it = g.iterator(); + while (it.hasNext()) { + String keyString = it.next().getName(); + byte[] key = Bytes.toBytes(keyString); + HRegionLocation loc = rl.getRegionLocation(key, true); + LOG.error("unreferred row " + keyString + ", " + loc); + } + } } } } @@ -941,7 +1238,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } /** - * A stand alone program that follows a linked list created by {@link Generator} and prints timing info. + * A stand alone program that follows a linked list created by {@link Generator} and prints + * timing info. */ private static class Walker extends Configured implements Tool { @Override @@ -1045,7 +1343,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } private static class Clean extends Configured implements Tool { - @Override public int run(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: Clean "); @@ -1133,16 +1430,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private void printCommands() { System.err.println("Commands:"); - System.err.println(" Generator Map only job that generates data."); - System.err.println(" Verify A map reduce job that looks for holes. Look at the counts "); + System.err.println(" generator Map only job that generates data."); + System.err.println(" verify A map reduce job that looks for holes. Look at the counts "); System.err.println(" after running. See REFERENCED and UNREFERENCED are ok. Any "); System.err.println(" UNDEFINED counts are bad. Do not run with the Generator."); - System.err.println(" Walker " + - "Standalong program that starts following a linked list & emits timing info."); - System.err.println(" Print Standalone program that prints nodes in the linked list."); - System.err.println(" Delete Standalone program that deletes a·single node."); - System.err.println(" Loop Program to Loop through Generator and Verify steps"); - System.err.println(" Clean Program to clean all left over detritus."); + System.err.println(" walker " + + "Standalone program that starts following a linked list & emits timing info."); + System.err.println(" print Standalone program that prints nodes in the linked list."); + System.err.println(" delete Standalone program that deletes a·single node."); + System.err.println(" loop Program to Loop through Generator and Verify steps"); + System.err.println(" clean Program to clean all left over detritus."); + System.err.println(" search Search for missing keys."); System.err.flush(); } @@ -1155,6 +1453,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { printUsage(this.getClass().getSimpleName() + " COMMAND []", "General options:", ""); printCommands(); + // Have to throw an exception here to stop the processing. Looks ugly but gets message across. throw new RuntimeException("Incorrect Number of args."); } toRun = args[0]; @@ -1165,7 +1464,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { public int runTestFromCommandLine() throws Exception { Tool tool = null; - if (toRun.equals("Generator")) { + if (toRun.equalsIgnoreCase("Generator")) { tool = new Generator(); } else if (toRun.equalsIgnoreCase("Verify")) { tool = new Verify(); @@ -1181,6 +1480,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { tool = new Delete(); } else if (toRun.equalsIgnoreCase("Clean")) { tool = new Clean(); + } else if (toRun.equalsIgnoreCase("Search")) { + tool = new Search(); } else { usage(); throw new RuntimeException("Unknown arg"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 67d9b0d..55fb4a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -128,14 +128,12 @@ public class WALPlayer extends Configured implements Tool { * A mapper that writes out {@link Mutation} to be directly applied to * a running HBase instance. */ - static class WALMapper + protected static class WALMapper extends Mapper { - private Map tables = - new TreeMap(); + private Map tables = new TreeMap(); @Override - public void map(WALKey key, WALEdit value, - Context context) + public void map(WALKey key, WALEdit value, Context context) throws IOException { try { if (tables.isEmpty() || tables.containsKey(key.getTablename())) { @@ -150,27 +148,29 @@ public class WALPlayer extends Configured implements Tool { // filtering WAL meta entries if (WALEdit.isMetaEditFamily(cell.getFamily())) continue; - // A WALEdit may contain multiple operations (HBASE-3584) and/or - // multiple rows (HBASE-5229). - // Aggregate as much as possible into a single Put/Delete - // operation before writing to the context. - if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() - || !CellUtil.matchingRow(lastCell, cell)) { - // row or type changed, write out aggregate KVs. - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); - + // Allow a subclass filter out this cell. + if (filter(context, cell)) { + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(lastCell, cell)) { + // row or type changed, write out aggregate KVs. + if (put != null) context.write(tableOut, put); + if (del != null) context.write(tableOut, del); + if (CellUtil.isDelete(cell)) { + del = new Delete(cell.getRow()); + } else { + put = new Put(cell.getRow()); + } + } if (CellUtil.isDelete(cell)) { - del = new Delete(cell.getRow()); + del.addDeleteMarker(cell); } else { - put = new Put(cell.getRow()); + put.add(cell); } } - if (CellUtil.isDelete(cell)) { - del.addDeleteMarker(cell); - } else { - put.add(cell); - } lastCell = cell; } // write residual KVs @@ -182,18 +182,30 @@ public class WALPlayer extends Configured implements Tool { } } + /** + * @param cell + * @return Return true if we are to emit this cell. + */ + protected boolean filter(Context context, final Cell cell) { + return false; + } + @Override public void setup(Context context) throws IOException { String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); - if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { + if (tablesToUse == null && tableMap == null) { + // Then user wants all tables. + } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { // this can only happen when WALMapper is used directly by a class other than WALPlayer throw new IOException("No tables or incorrect table mapping specified."); } int i = 0; - for (String table : tablesToUse) { - tables.put(TableName.valueOf(table), + if (tablesToUse != null) { + for (String table : tablesToUse) { + tables.put(TableName.valueOf(table), TableName.valueOf(tableMap[i++])); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index f981185..8613276 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2526,14 +2526,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Stops the previously started MiniMRCluster. */ public void shutdownMiniMapReduceCluster() { - LOG.info("Stopping mini mapreduce cluster..."); if (mrCluster != null) { + LOG.info("Stopping mini mapreduce cluster..."); mrCluster.shutdown(); mrCluster = null; + LOG.info("Mini mapreduce cluster stopped"); } // Restore configuration to point to local jobtracker conf.set("mapreduce.jobtracker.address", "local"); - LOG.info("Mini mapreduce cluster stopped"); } /**