diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 5304e3e..879dec6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -32,8 +35,10 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** @@ -51,11 +56,18 @@ public class RowCounter { * Mapper that runs the count. */ static class RowCounterMapper - extends TableMapper { + extends TableMapper { + boolean doDeepScan = false; + boolean doWrite = false; /** Counter enumeration to count the actual rows. */ public static enum Counters {ROWS} - + @Override + public void setup(final Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + if (conf.getBoolean("job.hbase.deep.scan", false)) doDeepScan = true; + if (TextOutputFormat.getOutputPath(context) != null) doWrite = true; + } /** * Maps the data. * @@ -69,9 +81,39 @@ public class RowCounter { @Override public void map(ImmutableBytesWritable row, Result values, Context context) - throws IOException { + throws IOException, InterruptedException { // Count every row containing data, whether it's in qualifiers or values context.getCounter(Counters.ROWS).increment(1); + if (doWrite == false && doDeepScan == false) return; + StringBuffer strBuffer = new StringBuffer(); + strBuffer.append("\nROW: ").append(new String(row.get())); + NavigableMap>> + multiLevelMap = values.getMap(); + Set>>> mapSet = + multiLevelMap.entrySet(); + for (Entry>> + entry : mapSet) { + strBuffer.append("\n\tFAMILY: "); + strBuffer.append(new String(entry.getKey())); + NavigableMap> val = entry.getValue(); + Set>> nestedMapSet = val.entrySet(); + for (Entry> nestedEntry : nestedMapSet) { + strBuffer.append("\n\tQUALIFIER: "); + strBuffer.append(new String(nestedEntry.getKey())); + NavigableMap versionMap = nestedEntry.getValue(); + for (Entry versionMapEntry : versionMap.entrySet()) { + strBuffer.append("\tVERSION: "); + strBuffer.append(Long.toString(versionMapEntry.getKey())); + strBuffer.append(" VALUE: "); + strBuffer.append(new String(versionMapEntry.getValue())); + } + } + } + + if (doWrite) { + String str = strBuffer.toString(); + context.write(new Text(""),new Text(str)); + } } } @@ -91,6 +133,10 @@ public class RowCounter { StringBuilder sb = new StringBuilder(); final String rangeSwitch = "--range="; + final String deepScan = "--deepscan"; + final String outputSwitch = "--output"; + boolean doDeepScan = false; + Path outputDir = null; // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { @@ -103,15 +149,22 @@ public class RowCounter { } startKey = startEnd[0]; endKey = startEnd[1]; - } - else { + } else if (args[i].startsWith(deepScan)) { + doDeepScan = true; + } else if (args[i].startsWith(outputSwitch)) { + outputDir = new Path(args[i].substring(outputSwitch.length() + 1)); + } else { // if no switch, assume column names sb.append(args[i]); sb.append(" "); } } - Job job = new Job(conf, NAME + "_" + tableName); + StringBuffer jobName = new StringBuffer(NAME); + if (doDeepScan) jobName.append("_withdeepscan"); + if (outputDir != null) jobName.append("_withhdfswrites"); + jobName.append("_TABLE_").append(tableName); + Job job = new Job(conf, jobName.toString()); job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); @@ -122,28 +175,44 @@ public class RowCounter { if (endKey != null && !endKey.equals("")) { scan.setStopRow(Bytes.toBytes(endKey)); } - scan.setFilter(new FirstKeyOnlyFilter()); - if (sb.length() > 0) { - for (String columnName : sb.toString().trim().split(" ")) { - String [] fields = columnName.split(":"); - if(fields.length == 1) { - scan.addFamily(Bytes.toBytes(fields[0])); - } else { - byte[] qualifier = Bytes.toBytes(fields[1]); - qualifiers.add(qualifier); - scan.addColumn(Bytes.toBytes(fields[0]), qualifier); + if (doDeepScan) { + scan.setMaxVersions(); + job.getConfiguration().setBoolean("job.hbase.deep.scan", true); + } else { + scan.setFilter(new FirstKeyOnlyFilter()); + if (sb.length() > 0) { + for (String columnName : sb.toString().trim().split(" ")) { + String [] fields = columnName.split(":"); + if(fields.length == 1) { + scan.addFamily(Bytes.toBytes(fields[0])); + } else { + byte[] qualifier = Bytes.toBytes(fields[1]); + qualifiers.add(qualifier); + scan.addColumn(Bytes.toBytes(fields[0]), qualifier); + } } } + // specified column may or may not be part of first key value for the row. + // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use + // FirstKeyValueMatchingQualifiersFilter. + if (qualifiers.size() == 0) { + scan.setFilter(new FirstKeyOnlyFilter()); + } else { + scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); + } } - // specified column may or may not be part of first key value for the row. - // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use - // FirstKeyValueMatchingQualifiersFilter. - if (qualifiers.size() == 0) { - scan.setFilter(new FirstKeyOnlyFilter()); + if (outputDir == null) { + job.setOutputFormatClass(NullOutputFormat.class); } else { - scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); + if (outputDir.getFileSystem(conf).exists(outputDir)) { + throw new IOException("Output directory " + outputDir + + " already exists."); + } + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); } - job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); job.setNumReduceTasks(0); @@ -163,7 +232,8 @@ public class RowCounter { */ private static void printUsage() { System.err.println("Usage: RowCounter [options] " + - "[--range=[startKey],[endKey]] [ ...]"); + "[--range=[startKey],[endKey]] [[--deepscan] [--output=] " + + "[ ...]]"); System.err.println("For performance consider the following options:\n" + "-Dhbase.client.scanner.caching=100\n" + "-Dmapred.map.tasks.speculative.execution=false"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 6e5cc90..4f1b6c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -965,7 +965,8 @@ public class AssignmentManager extends ZooKeeperListener { case RS_ZK_REGION_OPENED: // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState != null - && !regionState.isPendingOpenOrOpeningOnServer(sn)) { + && !regionState.isPendingOpenOrOpeningOnServer(sn) + && !ServerName.isSameHostnameAndPort(sn, regionState.getServerName())) { LOG.warn("Received OPENED for " + prettyPrintedRegionName + " from server " + sn + " but region was in the state " + regionState + " and not in expected PENDING_OPEN or OPENING states," diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 258ea44..2d59569 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -444,7 +444,7 @@ public class RegionStates { */ public synchronized void waitForAssignment( final HRegionInfo hri) throws InterruptedException { - if (!isRegionAssigned(hri)) return; + if (isRegionAssigned(hri)) return; while(!server.isStopped() && !isRegionAssigned(hri)) { RegionState rs = getRegionState(hri); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index cd77853..f556042 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -858,6 +858,32 @@ public class TestAssignmentManager { .getEncodedName())); } + @Test + public void testRegionAssignmentUponDoubleNotifications() throws Exception { +// AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( +// this.server, this.serverManager); + ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + // Create an AM. + AssignmentManager am = new AssignmentManager(this.server, + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); + am.failoverCleanupDone.set(true); + // Make sure our new AM gets callbacks; once registered, can't unregister. + // Thats ok because we make a new zk watcher for each test. + this.watcher.registerListenerFirst(am); + am.getRegionStates().createRegionState(REGIONINFO); + + ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A); + int version = ZKAssign.getVersion(this.watcher, REGIONINFO); + ZKAssign.transitionNodeOpening(this.watcher, REGIONINFO, SERVERNAME_A); + version = ZKAssign.getVersion(this.watcher, REGIONINFO); + ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, SERVERNAME_A, version); + version = ZKAssign.getVersion(this.watcher, REGIONINFO); + am.getRegionStates().waitForAssignment(REGIONINFO); + ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_A, EventType.RS_ZK_REGION_OPENED, + EventType.RS_ZK_REGION_OPENED, version); + } + /** * Test verifies whether assignment is skipped for regions of tables in DISABLING state during * clean cluster startup. See HBASE-6281.