From 654002d1ec0eae4cea219ade738baa870cf598a7 Mon Sep 17 00:00:00 2001 From: Chinmay Kulkarni Date: Thu, 18 May 2017 15:42:57 -0700 Subject: [PATCH] HBASE 17959: Canary timeout should be configurable on a per-table basis. Added support for configuring read/write timeouts on a per-table basis when in region mode. Added unit test for per-table timeout checks. --- .../java/org/apache/hadoop/hbase/tool/Canary.java | 128 +++++++++++++++++---- .../apache/hadoop/hbase/tool/TestCanaryTool.java | 55 ++++++++- 2 files changed, 155 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 475e811139..cfa3c9a5a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -246,6 +246,21 @@ public final class Canary implements Tool { } } + public static class RegionStdOutSink extends StdOutSink { + + private Map perTableReadWriteLatency = new HashMap<>(); + + public Map getReadWriteLatency() { + return this.perTableReadWriteLatency; + } + + public AtomicLong initializeReadWriteLatency (String tableName) { + AtomicLong initLatency = new AtomicLong(0L); + this.perTableReadWriteLatency.put(tableName, initLatency); + return initLatency; + } + } + static class ZookeeperTask implements Callable { private final Connection connection; private final String host; @@ -293,19 +308,21 @@ public final class Canary implements Tool { } private Connection connection; private HRegionInfo region; - private Sink sink; + private RegionStdOutSink sink; private TaskType taskType; private boolean rawScanEnabled; private ServerName serverName; + private AtomicLong readWriteLatency; - RegionTask(Connection connection, HRegionInfo region, ServerName serverName, Sink sink, - TaskType taskType, boolean rawScanEnabled) { + RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink, + TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) { this.connection = connection; this.region = region; this.serverName = serverName; this.sink = sink; this.taskType = taskType; this.rawScanEnabled = rawScanEnabled; + this.readWriteLatency = rwLatency; } @Override @@ -386,6 +403,7 @@ public final class Canary implements Tool { rs.next(); } stopWatch.stop(); + this.readWriteLatency.addAndGet(stopWatch.getTime()); sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); } catch (Exception e) { sink.publishReadFailure(serverName, region, column, e); @@ -396,7 +414,6 @@ public final class Canary implements Tool { } scan = null; get = null; - startKey = null; } } try { @@ -438,6 +455,7 @@ public final class Canary implements Tool { long startTime = System.currentTimeMillis(); table.put(put); long time = System.currentTimeMillis() - startTime; + this.readWriteLatency.addAndGet(time); sink.publishWriteTiming(serverName, region, column, time); } catch (Exception e) { sink.publishWriteFailure(serverName, region, column, e); @@ -573,6 +591,7 @@ public final class Canary implements Tool { private boolean writeSniffing = false; private boolean treatFailureAsError = false; private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME; + private HashMap configuredTimeoutPerTable = new HashMap<>(); private ExecutorService executor; // threads to retrieve data from regionservers @@ -673,7 +692,32 @@ public final class Canary implements Tool { } this.failOnError = Boolean.parseBoolean(args[i]); - } else { + } else if (cmd.equals("-perTable")) { + i++; + + if (i == args.length) { + System.err.println("-perTable needs a comma-separated list of timeouts per table (without spaces)."); + printUsageAndExit(); + } + String [] tableTimeouts = args[i].split(","); + for (String tT: tableTimeouts) { + String [] nameTimeout = tT.split("="); + if (nameTimeout.length < 2) { + System.err.println("Each -perTable argument must be of the form =."); + printUsageAndExit(); + } + long timeoutVal = 0L; + try { + timeoutVal = Long.parseLong(nameTimeout[1]); + } catch (NumberFormatException e) { + System.err.println("-perTable timeout for each table must be a numeric value argument."); + printUsageAndExit(); + } + this.configuredTimeoutPerTable.put(nameTimeout[0], timeoutVal); + } + } + + else { // no options match System.err.println(cmd + " options is invalid."); printUsageAndExit(); @@ -694,6 +738,10 @@ public final class Canary implements Tool { printUsageAndExit(); } } + if (!this.configuredTimeoutPerTable.isEmpty() && (this.regionServerMode || this.zookeeperMode)) { + System.err.println("-perTable can only be configured in region mode."); + printUsageAndExit(); + } return index; } @@ -794,7 +842,9 @@ public final class Canary implements Tool { System.err.println(" which means the table/regionserver is regular expression pattern"); System.err.println(" -f stop whole program if first error occurs," + " default is true"); - System.err.println(" -t timeout for a check, default is 600000 (milisecs)"); + System.err.println(" -t timeout for a check, default is 600000 (millisecs)"); + System.err.println(" -perTable =,=, ... " + + "comma-separated list of timeouts per table (no spaces), default is 600000 (millisecs)"); System.err.println(" -writeSniffing enable the write sniffing in canary"); System.err.println(" -treatFailureAsError treats read / write failure as error"); System.err.println(" -writeTable The table used for write sniffing." @@ -834,8 +884,9 @@ public final class Canary implements Tool { (ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError); } else { monitor = - new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor, - this.writeSniffing, this.writeTableName, this.treatFailureAsError); + new RegionMonitor(connection, monitorTargets, this.useRegExp, + (RegionStdOutSink) this.sink, this.executor, this.writeSniffing, + this.writeTableName, this.treatFailureAsError, this.configuredTimeoutPerTable); } return monitor; } @@ -926,10 +977,11 @@ public final class Canary implements Tool { private float regionsUpperLimit; private int checkPeriod; private boolean rawScanEnabled; + private HashMap configuredPerTableTimeout; public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, - Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, - boolean treatFailureAsError) { + RegionStdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, + boolean treatFailureAsError, HashMap configuredPerTableTimeout) { super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError); Configuration conf = connection.getConfiguration(); this.writeSniffing = writeSniffing; @@ -944,6 +996,14 @@ public final class Canary implements Tool { conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, DEFAULT_WRITE_TABLE_CHECK_PERIOD); this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); + this.configuredPerTableTimeout = new HashMap<>(configuredPerTableTimeout); + } + + private RegionStdOutSink getSink() { + if (!(sink instanceof RegionStdOutSink)) { + throw new RuntimeException("Can only write to Region sink"); + } + return ((RegionStdOutSink) sink); } @Override @@ -951,15 +1011,22 @@ public final class Canary implements Tool { if (this.initAdmin()) { try { List> taskFutures = new LinkedList<>(); + RegionStdOutSink regionSink = this.getSink(); if (this.targets != null && this.targets.length > 0) { String[] tables = generateMonitorTables(this.targets); + // Check to see that each table name passed in the -perTable argument is also passed as a monitor target. + if (! new HashSet<>(Arrays.asList(tables)).containsAll(this.configuredPerTableTimeout.keySet())) { + LOG.error("-perTable can only specify timeouts for monitor targets passed via command line."); + this.errorCode = USAGE_EXIT_CODE; + } this.initialized = true; for (String table : tables) { - taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ, - this.rawScanEnabled)); + AtomicLong rwLatency = regionSink.initializeReadWriteLatency(table); + taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, + this.rawScanEnabled, rwLatency)); } } else { - taskFutures.addAll(sniff(TaskType.READ)); + taskFutures.addAll(sniff(TaskType.READ, regionSink)); } if (writeSniffing) { @@ -971,9 +1038,10 @@ public final class Canary implements Tool { } lastCheckTime = EnvironmentEdgeManager.currentTime(); } + AtomicLong rwLatency = regionSink.initializeReadWriteLatency(writeTableName.getNameAsString()); // sniff canary table with write operation - taskFutures.addAll(Canary.sniff(admin, sink, admin.getTableDescriptor(writeTableName), - executor, TaskType.WRITE, this.rawScanEnabled)); + taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName), + executor, TaskType.WRITE, this.rawScanEnabled, rwLatency)); } for (Future future : taskFutures) { @@ -983,6 +1051,20 @@ public final class Canary implements Tool { LOG.error("Sniff region failed!", e); } } + Map actualPerTableTime = regionSink.getReadWriteLatency(); + for (String tableName : this.configuredPerTableTimeout.keySet()) { + if (actualPerTableTime.containsKey(tableName)) { + Long actual = actualPerTableTime.get(tableName).longValue(); + Long configured = this.configuredPerTableTimeout.get(tableName); + LOG.info("Read/Write operation for " + tableName + " took " + actual + + " ms. The configured timeout was " + configured + " ms."); + if (actual > configured) { + LOG.error("Read/Write operation for " + tableName + " exceeded the configured timeout."); + } + } else { + LOG.error("Read/Write operation for " + tableName + " failed!"); + } + } } catch (Exception e) { LOG.error("Run regionMonitor failed", e); this.errorCode = ERROR_EXIT_CODE; @@ -1037,7 +1119,7 @@ public final class Canary implements Tool { /* * canary entry point to monitor all the tables. */ - private List> sniff(TaskType taskType) throws Exception { + private List> sniff(TaskType taskType, RegionStdOutSink regionSink) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("reading list of tables")); } @@ -1045,7 +1127,8 @@ public final class Canary implements Tool { for (HTableDescriptor table : admin.listTables()) { if (admin.isTableEnabled(table.getTableName()) && (!table.getTableName().equals(writeTableName))) { - taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled)); + AtomicLong rwLatency = regionSink.initializeReadWriteLatency(table.getNameAsString()); + taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, rwLatency)); } } return taskFutures; @@ -1112,14 +1195,14 @@ public final class Canary implements Tool { * @throws Exception */ private static List> sniff(final Admin admin, final Sink sink, String tableName, - ExecutorService executor, TaskType taskType, boolean rawScanEnabled) throws Exception { + ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readWriteLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s", tableName)); } if (admin.isTableEnabled(TableName.valueOf(tableName))) { return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)), - executor, taskType, rawScanEnabled); + executor, taskType, rawScanEnabled, readWriteLatency); } else { LOG.warn(String.format("Table %s is not enabled", tableName)); } @@ -1127,11 +1210,11 @@ public final class Canary implements Tool { } /* - * Loops over regions that owns this table, and output some information abouts the state. + * Loops over regions that owns this table, and output some information about the state. */ private static List> sniff(final Admin admin, final Sink sink, HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType, - boolean rawScanEnabled) throws Exception { + boolean rawScanEnabled, AtomicLong readWriteLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName())); @@ -1156,7 +1239,8 @@ public final class Canary implements Tool { for (HRegionLocation location : regionLocator.getAllRegionLocations()) { ServerName rs = location.getServerName(); HRegionInfo region = location.getRegionInfo(); - tasks.add(new RegionTask(admin.getConnection(), region, rs, sink, taskType, rawScanEnabled)); + tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled, + readWriteLatency)); } } finally { if (regionLocator != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java index 0c9ae06b61..0b3bdafda4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java @@ -19,6 +19,7 @@ q * package org.apache.hadoop.hbase.tool; +import com.sun.javafx.util.Logging; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -39,19 +40,18 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import sun.rmi.runtime.Log; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import static org.junit.Assert.assertNotEquals; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.never; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) @Category({MediumTests.class}) @@ -110,7 +110,7 @@ public class TestCanaryTool { table.put(p); } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionServerStdOutSink sink = spy(new Canary.RegionServerStdOutSink()); + Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); Canary canary = new Canary(executor, sink); String[] args = { "-writeSniffing", "-t", "10000", name.getMethodName() }; ToolRunner.run(testingUtility.getConfiguration(), canary, args); @@ -119,6 +119,49 @@ public class TestCanaryTool { verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong()); } + @Test + public void testPerTableTimeouts() throws Exception { + final TableName [] tableNames = new TableName[2]; + tableNames[0] = TableName.valueOf(name.getMethodName() + "1"); + tableNames[1] = TableName.valueOf(name.getMethodName() + "2"); + // Create 2 test tables. + for (int j = 0; j<2; j++) { + Table table = testingUtility.createTable(tableNames[j], new byte[][] { FAMILY }); + // insert some test rows + for (int i=0; i<1000; i++) { + byte[] iBytes = Bytes.toBytes(i + j); + Put p = new Put(iBytes); + p.addColumn(FAMILY, COLUMN, iBytes); + table.put(p); + } + } + ExecutorService executor = new ScheduledThreadPoolExecutor(1); + Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); + Canary canary = new Canary(executor, sink); + String configuredTimeoutStr = tableNames[0].getNameAsString() + "=" + Long.MAX_VALUE + "," + + tableNames[1].getNameAsString() + "=0"; + String[] args = { "-perTable", configuredTimeoutStr, name.getMethodName() + "1", name.getMethodName() + "2"}; + ToolRunner.run(testingUtility.getConfiguration(), canary, args); + verify(sink, times(tableNames.length)).initializeReadWriteLatency(isA(String.class)); + for (int i=0; i<2; i++) { + assertNotEquals("verify non-null read/write latency", null, sink.getReadWriteLatency().get(tableNames[i].getNameAsString())); + assertNotEquals("verify non-zero read/write latency", 0L, sink.getReadWriteLatency().get(tableNames[i].getNameAsString())); + } + // One table's timeout is set for 0 ms and thus, should lead to an error. + verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ((LoggingEvent) argument).getRenderedMessage().contains("exceeded the configured timeout."); + } + })); + verify(mockAppender, times(2)).doAppend(argThat(new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ((LoggingEvent) argument).getRenderedMessage().contains("The configured timeout was"); + } + })); + } + //no table created, so there should be no regions @Test public void testRegionserverNoRegions() throws Exception { @@ -157,7 +200,7 @@ public class TestCanaryTool { table.put(p); } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionServerStdOutSink sink = spy(new Canary.RegionServerStdOutSink()); + Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); Canary canary = new Canary(executor, sink); String[] args = { "-t", "10000", name.getMethodName() }; org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(testingUtility.getConfiguration()); -- 2.11.0 (Apple Git-81)