From 811467abd126990cbb4356c3349b13cdba000220 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 9 Apr 2015 23:24:27 -0400 Subject: [PATCH] HBASE-13078 Validate trace spans in test. Use the LocalFileSpanReceiver to store spans, move the test in with the rest of the tests in hbase-server, and use the default TestingUtil to start the hbase instance. --- .../trace/IntegrationTestSendTraceRequests.java | 278 +++++++++++++++------ 1 file changed, 196 insertions(+), 82 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index 3fa8a9c..836b354 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hbase.trace; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; @@ -32,36 +37,52 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.apache.htrace.impl.LocalFileSpanReceiver; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Test that spans are created as expected + */ @Category(IntegrationTests.class) public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { + private static final Log LOG = LogFactory.getLog(IntegrationTestSendTraceRequests.class); public static final String TABLE_ARG = "t"; public static final String CF_ARG = "f"; - public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; public static final String COLUMN_FAMILY_DEFAULT = "D"; - private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); - private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); - private IntegrationTestingUtility util; - private Random random = new Random(); - private Admin admin; - private SpanReceiverHost receiverHost; + + private static final int NUM_PUTS = 5000; + private static final int NUM_SCANS = 100; + private static final int NUM_GETS = 100; + private static final int GETS_PER_TASK = 5; + private static final String SCANS = "scans"; + private static final String GETS = "gets"; + private static final String CREATE_TABLE = "createTable"; + private static final String DELETE_TABLE = "deleteTable"; + private static final String INSERT_DATA = "insertData"; public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); @@ -91,79 +112,167 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { return 0; } + private IntegrationTestingUtility util; + private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); + private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); + private Random random = new Random(); + private Admin admin; + @Test public void internalDoWork() throws Exception { + String hbaseItModuleDir = System.getProperty("user.dir"); + // Sanity check + assertNotNull(hbaseItModuleDir); + File target = new File(hbaseItModuleDir, "target"); + final File traceFile = new File(target, "TestSendTraceRequests_traces.txt"); + if (traceFile.exists()) { + // Clear out the file if it exists already + traceFile.delete(); + // Try to clean up the one that will be created + traceFile.deleteOnExit(); + } + + Configuration conf = new Configuration(false); + conf.set("hbase.local-file-span-receiver.path", traceFile.getAbsolutePath()); + LocalFileSpanReceiver rcvr = new LocalFileSpanReceiver(new HBaseHTraceConfiguration(conf)); + Trace.addReceiver(rcvr); + util = createUtil(); - admin = util.getHBaseAdmin(); - setupReceiver(); - - deleteTable(); - createTable(); - LinkedBlockingQueue rks = insertData(); - - ExecutorService service = Executors.newFixedThreadPool(20); - doScans(service, rks); - doGets(service, rks); - - service.shutdown(); - service.awaitTermination(100, TimeUnit.SECONDS); - Thread.sleep(90000); - receiverHost.closeReceivers(); - util.restoreCluster(); - util = null; - } - private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + try { + admin = util.getHBaseAdmin(); - for (int i = 0; i < 100; i++) { - Runnable runnable = new Runnable() { - private TraceScope innerScope = null; - private final LinkedBlockingQueue rowKeyQueue = rks; - @Override - public void run() { - ResultScanner rs = null; - try { - innerScope = Trace.startSpan("Scan", Sampler.ALWAYS); - Table ht = util.getConnection().getTable(tableName); - Scan s = new Scan(); - s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); - s.setBatch(7); - rs = ht.getScanner(s); - // Something to keep the jvm from removing the loop. - long accum = 0; - - for(int x = 0; x < 1000; x++) { - Result r = rs.next(); - accum |= Bytes.toLong(r.getRow()); - } + boolean tableDeleted = deleteTable(); + createTable(); + LinkedBlockingQueue rks = insertData(); - innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum); + ExecutorService service = Executors.newFixedThreadPool(20); + doScans(service, rks); + doGets(service, rks); - ht.close(); - ht = null; - } catch (IOException e) { - e.printStackTrace(); + service.shutdown(); + while (!service.awaitTermination(5, TimeUnit.SECONDS)) { + // Still waiting for shutdown + LOG.debug("Still waiting for thread pool to stop"); + } + LOG.debug("Thread pool stopped"); - innerScope.getSpan().addKVAnnotation( - Bytes.toBytes("exception"), - Bytes.toBytes(e.getClass().getSimpleName())); + // Stop the span receiver + rcvr.close(); - } catch (Exception e) { - } finally { - if (innerScope != null) innerScope.close(); - if (rs != null) rs.close(); + assertTrue("File containing spans does not exist: " + traceFile, traceFile.exists()); + Map traces = readFileSpans(traceFile); + verifyTraces(traces, tableDeleted); + } finally { + Trace.removeReceiver(rcvr); + rcvr = null; + } + } + + /** + * Read spans serialized to a file by LocalFileSpanReceiver. + * @param traceFile File spans are written to + * @return Map of span description to frequency + */ + private Map readFileSpans(File traceFile) throws IOException { + Map traces = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(traceFile))) { + String line = null; + while ((line = reader.readLine()) != null) { + String[] parts = StringUtils.split(line, ','); + if (parts.length < 5) { + // unknown line + continue; + } + String spanNameComponent = parts[4]; + parts = StringUtils.split(spanNameComponent,':'); + assertTrue("Unable to parse into two pieces: " + spanNameComponent, parts.length == 2); + String spanName = parts[1]; + spanName = spanName.substring(1, spanName.length() - 1); + AtomicInteger counter = traces.get(spanName); + if (null == counter) { + counter = new AtomicInteger(0); + traces.put(spanName, counter); + } + counter.incrementAndGet(); + } + } + return traces; + } + + /** + * Verify the expected number of spans were seen + * @param traces Map of Span description to frequency + */ + private void verifyTraces(Map traces, boolean tableDeleted) { + assertEquals("Found unexpected number of scan spans", NUM_SCANS, traces.get(SCANS).get()); + assertEquals("Found unexpected number of put spans", NUM_PUTS, traces.get(INSERT_DATA).get()); + assertEquals("Found unexpected number of get spans", NUM_GETS * GETS_PER_TASK, traces.get(GETS) + .get()); + assertEquals("Found unexpected number of create table spans", 1, traces.get(CREATE_TABLE).get()); + if (tableDeleted) { + assertEquals("Found unexpected number of delete table spans", 1, traces.get(DELETE_TABLE).get()); + } + } + + /** + * Queue scans against the provided row keys + * @param service thread pool + * @param rks row keys + */ + private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + for (int i = 0; i < NUM_SCANS; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rks; + + @Override + public void run() { + ResultScanner rs = null; + try { + innerScope = Trace.startSpan(SCANS, Sampler.ALWAYS); + Table ht = util.getConnection().getTable(tableName); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); + s.setBatch(7); + rs = ht.getScanner(s); + // Something to keep the jvm from removing the loop. + long accum = 0; + + for (int x = 0; x < 1000; x++) { + Result r = rs.next(); + accum |= Bytes.toLong(r.getRow()); } + innerScope.getSpan().addTimelineAnnotation(" = " + accum); + + ht.close(); + ht = null; + } catch (IOException e) { + e.printStackTrace(); + + innerScope.getSpan().addKVAnnotation(Bytes.toBytes("exception"), + Bytes.toBytes(e.getClass().getSimpleName())); + + } catch (Exception e) {} finally { + if (innerScope != null) innerScope.close(); + if (rs != null) rs.close(); } - }; - service.submit(runnable); - } + } + }; + service.submit(runnable); + } } + /** + * Queue a number of GETS against the provided rowkeys + * @param service thread pool + * @param rowKeys row keys + */ private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) throws IOException { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < NUM_GETS; i++) { Runnable runnable = new Runnable() { private TraceScope innerScope = null; private final LinkedBlockingQueue rowKeyQueue = rowKeys; @@ -171,7 +280,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { @Override public void run() { - Table ht = null; try { ht = util.getConnection().getTable(tableName); @@ -180,9 +288,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } long accum = 0; - for (int x = 0; x < 5; x++) { + for (int x = 0; x < GETS_PER_TASK; x++) { try { - innerScope = Trace.startSpan("gets", Sampler.ALWAYS); + innerScope = Trace.startSpan(GETS, Sampler.ALWAYS); long rk = rowKeyQueue.take(); Result r1 = ht.get(new Get(Bytes.toBytes(rk))); if (r1 != null) { @@ -199,7 +307,8 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } catch (InterruptedException ie) { // IGNORED } finally { - if (innerScope != null) innerScope.close(); + if (innerScope != null) + innerScope.close(); } } @@ -209,35 +318,47 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } } + /** + * Create a table in a span + */ private void createTable() throws IOException { TraceScope createScope = null; try { - createScope = Trace.startSpan("createTable", Sampler.ALWAYS); + createScope = Trace.startSpan(CREATE_TABLE, Sampler.ALWAYS); util.createTable(tableName, familyName); } finally { if (createScope != null) createScope.close(); } } - private void deleteTable() throws IOException { + /** + * Delete a table in a span + * @return True if the table was deleted + */ + private boolean deleteTable() throws IOException { TraceScope deleteScope = null; - try { if (admin.tableExists(tableName)) { - deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS); + deleteScope = Trace.startSpan(DELETE_TABLE, Sampler.ALWAYS); util.deleteTable(tableName); + return true; } } finally { if (deleteScope != null) deleteScope.close(); } + return false; } + /** + * Write some data inside spans + * @return The rowkeys + */ private LinkedBlockingQueue insertData() throws IOException, InterruptedException { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; - for (int x = 0; x < 5000; x++) { - TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); + for (int x = 0; x < NUM_PUTS; x++) { + TraceScope traceScope = Trace.startSpan(INSERT_DATA, Sampler.ALWAYS); try { for (int i = 0; i < 5; i++) { long rk = random.nextLong(); @@ -245,7 +366,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { Put p = new Put(Bytes.toBytes(rk)); for (int y = 0; y < 10; y++) { random.nextBytes(value); - p.add(familyName, Bytes.toBytes(random.nextLong()), value); + p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value); } ht.mutate(p); } @@ -275,11 +396,4 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } return this.util; } - - private void setupReceiver() { - Configuration conf = new Configuration(util.getConfiguration()); - conf.setBoolean("hbase.zipkin.is-in-client-mode", true); - - this.receiverHost = SpanReceiverHost.getInstance(conf); - } } -- 2.1.2