diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestVerifyAsyncProcess.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestVerifyAsyncProcess.java new file mode 100644 index 0000000..e31a686 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestVerifyAsyncProcess.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TestAsyncProcessAccounting; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; + +/** + * Simple test that creates 1M random rows. It then writes them and afterward asserts that the + * 1M rows made it in. Idea is you'd run monkeys while this was running to see if fail. + * TODO: we keep the 1M rows around so we should be able to figure exactly which row was bad and + * even when it went in. This IT is to try and find issue in the AsyncProcess accounting inside + * the client; there is issue in ITBLL that trying to figure. This test is to check writing. + */ +public class IntegrationTestVerifyAsyncProcess extends IntegrationTestBase { + private static final Log LOG = LogFactory.getLog(IntegrationTestVerifyAsyncProcess.class); + private static final String ROWS_SHORT_OPT = "r"; + private static final String PAUSE_SHORT_OPT = "p"; + private int rowCount = 1000000; + private byte [][] rows = null; + private int pauseInMs = 60000; + private static final TableName TABLENAME = TableName.valueOf("accounting"); + private static final int ROWKEY_LENGTH = 16; + private final byte [] FAMILY = Bytes.toBytes("f"); + + @Override + protected void addOptions() { + super.addOptions(); + // Change amount of rows to run by passing 'r' or 'rows' on command line. + addOptWithArg(ROWS_SHORT_OPT, "rows", "Rows to test with. Default: 1M. Kept in memory."); + addOptWithArg(PAUSE_SHORT_OPT, "pause", + "Time to pause between write and read in seconds. Default: 60s"); + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + String countStr = cmd.getOptionValue(ROWS_SHORT_OPT); + if (countStr != null) this.rowCount = Integer.parseInt(countStr); + String pauseStr = cmd.getOptionValue(PAUSE_SHORT_OPT); + if (pauseStr != null) this.pauseInMs = Integer.parseInt(pauseStr) * 1000; + } + + @Override + protected int doWork() throws Exception { + return super.doWork(); + } + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(getConf()); + boolean isDistributed = util.isDistributedCluster(); + util.initializeCluster(isDistributed? 1: 3 /*At least three servers*/); + if (!isDistributed) { + util.startMiniMapReduceCluster(); + } + this.setConf(util.getConfiguration()); + // Now create our table. + try (Connection connection = ConnectionFactory.createConnection(getConf())) { + TestAsyncProcessAccounting.createTable(connection, TABLENAME, FAMILY, 5, 2); + } + this.rows = TestAsyncProcessAccounting.createRandomRows(this.rowCount, ROWKEY_LENGTH); + } + + @Override + public int runTestFromCommandLine() throws Exception { + LOG.info("RowCount=" + this.rowCount); + // Use HTable so our output is like that seen in ITBLL. + try (@SuppressWarnings("deprecation") + Table t = new HTable(getConf(), TABLENAME)) { + // Write rows like ITBLL does. + TestAsyncProcessAccounting.persist(t, FAMILY, 0, this.rows.length, 0, this.rows, this.rows, + Bytes.toBytes("client"), LOG); + LOG.info("Pausing for " + this.pauseInMs + "ms"); + org.apache.hadoop.hbase.util.Threads.sleep(this.pauseInMs); + LOG.info("Stopping the monkey. No need anymore"); + monkey.stop("Done with the monkey"); + // Ensure all are there. + try (ResultScanner scanner = t.getScanner(new Scan())) { + int count = 0; + while (scanner.next() != null) { + if (count % 10000 == 0) LOG.info("RowCount=" + count); + count++; + } + assertEquals(this.rows.length, count); + } + } + return 0; + } + + @Override + public TableName getTablename() { + return TableName.valueOf(this.getClass().getSimpleName()); + } + + @Override + protected Set getColumnFamilies() { + Set cfs = new HashSet(); + cfs.add("cf"); + return cfs; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestVerifyAsyncProcess(), args); + System.exit(ret); + } +} \ No newline at end of file diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 0a62966..9c6aa16 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -53,12 +55,15 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -597,6 +602,21 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private ArrayList refs = new ArrayList(); private AtomicInteger rows = new AtomicInteger(0); + private Connection connection; + + @Override + protected void setup(Reducer.Context context) + throws IOException, InterruptedException { + super.setup(context); + this.connection = ConnectionFactory.createConnection(context.getConfiguration()); + } + + @Override + protected void cleanup(Reducer.Context context) + throws IOException, InterruptedException { + if (this.connection != null) this.connection.close(); + super.cleanup(context); + } @Override public void reduce(BytesWritable key, Iterable values, Context context) @@ -622,11 +642,33 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (defCount == 0 || refs.size() != 1) { refsSb = new StringBuilder(); String comma = ""; - for (byte[] ref : refs) { - refsSb.append(comma); - comma = ","; - refsSb.append(Bytes.toStringBinary(ref)); + try (Table t = this.connection.getTable(getTableName(context.getConfiguration()))) { + for (byte[] ref : refs) { + Result r = t.get(new Get(ref)); + List cells = r.listCells(); + String ts = (cells != null && !cells.isEmpty())? + new Date(cells.get(0).getTimestamp()).toString(): ""; + byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); + String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; + b = r.getValue(FAMILY_NAME, COLUMN_COUNT); + int count = (b != null && b.length > 0)? Bytes.toInt(b): -1; + b = r.getValue(FAMILY_NAME, COLUMN_PREV); + String regionLocation = ""; + if (b != null && b.length > 0) { + try (RegionLocator rl = + this.connection.getRegionLocator(getTableName(context.getConfiguration()))) { + HRegionLocation hrl = rl.getRegionLocation(b); + if (hrl != null) regionLocation = hrl.toString(); + } + } + LOG.error("Reference=" + Bytes.toString(ref) + ", prev=" + Bytes.toString(b) + + ", date=" + ts + ", jobStr=" + jobStr + ", count=" + count + + ", regionLocation=" + regionLocation); + refsSb.append(comma); + comma = ","; + refsSb.append(Bytes.toStringBinary(ref)); } + } keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4a33bcd..ea443e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1915,6 +1915,21 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } /** + * Used in tests to stop the processing of dead servers a while so we can have the state of + * dead regionservers stick around longer. + * @param b + * @return State of the serverShutdownHandlerEnabled flag. + * @throws InterruptedException + * @throws IOException + */ + @VisibleForTesting + public boolean serverShutdownHandlerEnabled(boolean b) throws IOException, InterruptedException { + if (b) enableServerShutdownHandler(true); + else this.serverShutdownHandlerEnabled = b; + return b; + } + + /** * Report whether this master has started initialization and is about to do meta region assignment * @return true if master is in initialization & about to assign hbase:meta regions */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessAccounting.java new file mode 100644 index 0000000..7f098eb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessAccounting.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({LargeTests.class}) +public class TestAsyncProcessAccounting { + private final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[][] ROWS; + private final int SERVERS = 5; + private static final int ROWKEY_SIZE = 16; + private final byte [] FAMILY = Bytes.toBytes("f"); + private static final TableName TABLENAME = TableName.valueOf("t"); + + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(SERVERS); + createTable(TEST_UTIL.getConnection(), TABLENAME, FAMILY, SERVERS, 2); + ROWS = createRandomRows(100000, ROWKEY_SIZE); + } + + /** + * @param rowsLength + * @param rowSize + * @return Array of random rows. + */ + public static byte [][] createRandomRows(final int rowsLength, final int rowSize) { + Random random = new Random(); + byte [][] rows = new byte [rowsLength][]; + for (int i = 0; i < rowsLength; i++) { + byte[] bytes = new byte[rowSize]; + random.nextBytes(bytes); + rows[i] = bytes; + } + return rows; + } + + /** + * Create table for the test with appropriate splits and config. + * @param tn + * @param servers + * @param perServer + * @throws IOException + */ + public static void createTable(final Connection connection, final TableName tn, + final byte [] family, final int servers, final int perServer) + throws IOException { + byte[][] splits = new RegionSplitter.UniformSplit().split(servers * perServer); + HTableDescriptor htd = new HTableDescriptor(tn); + // Disable splitting! + htd.setMaxFileSize(10 * 1024 * 1024 * 1024); + htd.addFamily(new HColumnDescriptor(family)); + try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { + if (admin.tableExists(htd.getTableName())) { + admin.disableTable(htd.getTableName()); + admin.deleteTable(htd.getTableName()); + } + admin.createTable(htd, splits); + } + } + + /** + * Persist rows to the cluster. + * @param t + * @param cf + * @param start + * @param stop + * @param count + * @param prev + * @param current + * @param id + * @param log + * @throws IOException + */ + public static void persist(Table t, byte [] cf, int start, int stop, long count, byte[][] prev, + byte[][] current, byte[] id, Log log) + throws IOException { + // Set configs to be same as in ITBLL, the test we are trying to debug. + t.setAutoFlushTo(false); + t.setWriteBufferSize(4 * 1024 * 1024); + for (int i = start; i < stop; i++) { + Put put = new Put(current[i]); + put.add(cf, Bytes.toBytes("previous"), prev[i]); + if (count >= 0) { + put.add(cf, Bytes.toBytes("count"), Bytes.toBytes(count + i)); + } + if (id != null) { + put.add(cf, Bytes.toBytes("client"), id); + } + t.put(put); + if (i % 10000 == 0) log.info("PutCount=" + i); + } + t.flushCommits(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @return The non-meta server we killed. + * @throws IOException + */ + private ServerName killNonMetaRegionServer() throws IOException { + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + List rss = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + for (RegionServerThread t: rss) { + ServerName sn = t.getRegionServer().getServerName(); + if (sn.equals(metaServer)) continue; + if (t.getRegionServer().isStopped()) continue; + LOG.info("Killing " + sn); + ((MiniHBaseCluster.MiniHBaseClusterRegionServer)t.getRegionServer()).kill(); + // We killed one. Return. + return sn; + } + return null; + } + + private void killAndHang() { + try { + LOG.info("Stopped processing of dead servers"); + TEST_UTIL.getHBaseCluster().getMaster().serverShutdownHandlerEnabled(false); + killNonMetaRegionServer(); + Threads.sleep(60000); + LOG.info("Starting processing of dead servers"); + TEST_UTIL.getHBaseCluster().getMaster().serverShutdownHandlerEnabled(true); + } catch (IOException e) { + LOG.error(e); + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testAccounting() throws IOException { + try (Table t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME)) { + t.setAutoFlushTo(false); + t.setWriteBufferSize(4 * 1024 * 1024); + Thread backgroundTask = new Thread("background killer") { + @Override + public void run() { + killAndHang(); + killAndHang(); + } + }; + backgroundTask.start(); + persist(t, FAMILY, 0, ROWS.length, 0, ROWS, ROWS, Bytes.toBytes("client"), LOG); + // Ensure all are there. + try (ResultScanner scanner = t.getScanner(new Scan())) { + int count = 0; + while (scanner.next() != null) { + if (count % 10000 == 0) LOG.info("RowCount=" + count); + count++; + } + assertEquals(ROWS.length, count); + } + } + } +} \ No newline at end of file