From 76a2acc0423c1daec579ab7706968f216fcc2cb0 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 9 Apr 2015 23:24:27 -0400 Subject: [PATCH] HBASE-13078 Validate trace spans in test. Use the LocalFileSpanReceiver to store spans, move the test in with the rest of the tests in hbase-server, and use the default TestingUtil to start the hbase instance. --- .../trace/IntegrationTestSendTraceRequests.java | 368 ++++++++++++++++----- 1 file changed, 292 insertions(+), 76 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index 515f96d..cf513d4 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hbase.trace; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -30,36 +35,63 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; +import org.cloudera.htrace.HTraceConfiguration; import org.cloudera.htrace.Sampler; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.SpanReceiver; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.util.ajax.JSON; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Test that spans are created as expected + */ @Category(IntegrationTests.class) public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { + private static final Log LOG = LogFactory.getLog(IntegrationTestSendTraceRequests.class); public static final String TABLE_ARG = "t"; public static final String CF_ARG = "f"; - public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; public static final String COLUMN_FAMILY_DEFAULT = "D"; - private String tableName = TABLE_NAME_DEFAULT; + + private static final int NUM_PUTS = 5000; + private static final int NUM_SCANS = 100; + private static final int NUM_GETS = 100; + private static final int GETS_PER_TASK = 5; + private static final String SCANS = "scans"; + private static final String GETS = "gets"; + private static final String CREATE_TABLE = "createTable"; + private static final String DELETE_TABLE = "deleteTable"; + private static final String INSERT_DATA = "insertData"; + + private String tableName = "SendTracesTable"; private String familyName = COLUMN_FAMILY_DEFAULT; private IntegrationTestingUtility util; private Random random = new Random(); private HBaseAdmin admin; - private SpanReceiverHost receiverHost; public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); @@ -91,77 +123,159 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { @Test public void internalDoWork() throws Exception { - util = createUtil(); - admin = util.getHBaseAdmin(); - setupReceiver(); - - deleteTable(); - createTable(); - LinkedBlockingQueue rks = insertData(); - - ExecutorService service = Executors.newFixedThreadPool(20); - doScans(service, rks); - doGets(service, rks); - - service.shutdown(); - service.awaitTermination(100, TimeUnit.SECONDS); - Thread.sleep(90000); - receiverHost.closeReceivers(); - util.restoreCluster(); - util = null; - } + String hbaseItModuleDir = System.getProperty("user.dir"); + // Sanity check + assertNotNull(hbaseItModuleDir); + File target = new File(hbaseItModuleDir, "target"); + final File traceFile = new File(target, "TestSendTraceRequests_traces.txt"); + if (traceFile.exists()) { + // Clear out the file if it exists already + traceFile.delete(); + // Try to clean up the one that will be created + traceFile.deleteOnExit(); + } - private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + Configuration conf = new Configuration(false); + conf.set("hbase.local-file-span-receiver.path", traceFile.getAbsolutePath()); + // htrace-2.0.4 doesn't separate spans by newline so do it ourselves. + NewlineLocalFileSpanReceiver rcvr = new NewlineLocalFileSpanReceiver(); + rcvr.configure(new HBaseHTraceConfiguration(conf)); + Trace.addReceiver(rcvr); - for (int i = 0; i < 100; i++) { - Runnable runnable = new Runnable() { - private TraceScope innerScope = null; - private final LinkedBlockingQueue rowKeyQueue = rks; - @Override - public void run() { - ResultScanner rs = null; - try { - innerScope = Trace.startSpan("Scan", Sampler.ALWAYS); - HTable ht = new HTable(util.getConfiguration(), tableName); - Scan s = new Scan(); - s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); - s.setBatch(7); - rs = ht.getScanner(s); - // Something to keep the jvm from removing the loop. - long accum = 0; - - for(int x = 0; x < 1000; x++) { - Result r = rs.next(); - accum |= Bytes.toLong(r.getRow()); - } + try { + util = createUtil(); + admin = util.getHBaseAdmin(); - innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum); + boolean deletedTable = deleteTable(); + createTable(); + LinkedBlockingQueue rks = insertData(); - ht.close(); - ht = null; - } catch (IOException e) { - e.printStackTrace(); + ExecutorService service = Executors.newFixedThreadPool(20); + doScans(service, rks); + doGets(service, rks); - innerScope.getSpan().addKVAnnotation( - Bytes.toBytes("exception"), - Bytes.toBytes(e.getClass().getSimpleName())); + service.shutdown(); + while (!service.awaitTermination(5, TimeUnit.SECONDS)) { + // Still waiting for shutdown + LOG.debug("Still waiting for thread pool to stop"); + } + LOG.debug("Thread pool stopped"); - } catch (Exception e) { - } finally { - if (innerScope != null) innerScope.close(); - if (rs != null) rs.close(); + // Stop the span receiver + rcvr.close(); + + assertTrue("File containing spans does not exist: " + traceFile, traceFile.exists()); + Map traces = readFileSpans(traceFile); + verifyTraces(traces, deletedTable); + } finally { + Trace.removeReceiver(rcvr); + rcvr = null; + } + } + + /** + * Read spans serialized to a file by LocalFileSpanReceiver. + * @param traceFile File spans are written to + * @return Map of span description to frequency + */ + private Map readFileSpans(File traceFile) throws IOException { + Map traces = new HashMap(); + BufferedReader reader = new BufferedReader(new FileReader(traceFile)); + try { + String line = null; + while ((line = reader.readLine()) != null) { + @SuppressWarnings("unchecked") + Map json = (Map) JSON.parse(line); + String spanName = json.get("Description"); + assertNotNull(spanName); + AtomicInteger counter = traces.get(spanName); + if (null == counter) { + counter = new AtomicInteger(0); + traces.put(spanName, counter); + } + counter.incrementAndGet(); + } + } finally { + reader.close(); + } + return traces; + } + + /** + * Verify the expected number of spans were seen + * @param traces Map of Span description to frequency + * @param deletedTable was the table deleted + */ + private void verifyTraces(Map traces, boolean deletedTable) { + assertEquals("Found unexpected number of scan spans", NUM_SCANS, traces.get(SCANS).get()); + assertEquals("Found unexpected number of put spans", NUM_PUTS, traces.get(INSERT_DATA).get()); + assertEquals("Found unexpected number of get spans", NUM_GETS * GETS_PER_TASK, traces.get(GETS) + .get()); + assertEquals("Found unexpected number of create table spans", 1, traces.get(CREATE_TABLE).get()); + if (deletedTable) { + assertEquals("Found unexpected number of delete table spans", 1, traces.get(DELETE_TABLE) + .get()); + } + } + + /** + * Queue scans against the provided row keys + * @param service thread pool + * @param rks row keys + */ + private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + for (int i = 0; i < NUM_SCANS; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rks; + + @Override + public void run() { + ResultScanner rs = null; + try { + innerScope = Trace.startSpan(SCANS, Sampler.ALWAYS); + HTable ht = new HTable(util.getConfiguration(), tableName); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); + s.setBatch(7); + rs = ht.getScanner(s); + // Something to keep the jvm from removing the loop. + long accum = 0; + + for (int x = 0; x < 1000; x++) { + Result r = rs.next(); + accum |= Bytes.toLong(r.getRow()); } + innerScope.getSpan().addTimelineAnnotation(" = " + accum); + + ht.close(); + ht = null; + } catch (IOException e) { + e.printStackTrace(); + + innerScope.getSpan().addKVAnnotation(Bytes.toBytes("exception"), + Bytes.toBytes(e.getClass().getSimpleName())); + + } catch (Exception e) {} finally { + if (innerScope != null) innerScope.close(); + if (rs != null) rs.close(); } - }; - service.submit(runnable); - } + } + }; + service.submit(runnable); + } } + /** + * Queue a number of GETS against the provided rowkeys + * @param service thread pool + * @param rowKeys row keys + */ private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) throws IOException { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < NUM_GETS; i++) { Runnable runnable = new Runnable() { private TraceScope innerScope = null; private final LinkedBlockingQueue rowKeyQueue = rowKeys; @@ -169,7 +283,6 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { @Override public void run() { - HTable ht = null; try { ht = new HTable(util.getConfiguration(), tableName); @@ -178,9 +291,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } long accum = 0; - for (int x = 0; x < 5; x++) { + for (int x = 0; x < GETS_PER_TASK; x++) { try { - innerScope = Trace.startSpan("gets", Sampler.ALWAYS); + innerScope = Trace.startSpan(GETS, Sampler.ALWAYS); long rk = rowKeyQueue.take(); Result r1 = ht.get(new Get(Bytes.toBytes(rk))); if (r1 != null) { @@ -207,35 +320,48 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } } + /** + * Create a table in a span + */ private void createTable() throws IOException { TraceScope createScope = null; try { - createScope = Trace.startSpan("createTable", Sampler.ALWAYS); + createScope = Trace.startSpan(CREATE_TABLE, Sampler.ALWAYS); util.createTable(tableName, familyName); } finally { if (createScope != null) createScope.close(); } } - private void deleteTable() throws IOException { + /** + * Delete a table in a span + * @return True if the table was deleted + */ + private boolean deleteTable() throws IOException { TraceScope deleteScope = null; try { if (admin.tableExists(tableName)) { - deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS); + deleteScope = Trace.startSpan(DELETE_TABLE, Sampler.ALWAYS); util.deleteTable(tableName); + return true; } } finally { if (deleteScope != null) deleteScope.close(); } + return false; } + /** + * Write some data inside spans + * @return The rowkeys + */ private LinkedBlockingQueue insertData() throws IOException, InterruptedException { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); HTable ht = new HTable(util.getConfiguration(), this.tableName); byte[] value = new byte[300]; - for (int x = 0; x < 5000; x++) { - TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); + for (int x = 0; x < NUM_PUTS; x++) { + TraceScope traceScope = Trace.startSpan(INSERT_DATA, Sampler.ALWAYS); try { ht.setAutoFlush(false, true); for (int i = 0; i < 5; i++) { @@ -244,9 +370,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { Put p = new Put(Bytes.toBytes(rk)); for (int y = 0; y < 10; y++) { random.nextBytes(value); - p.add(Bytes.toBytes(familyName), - Bytes.toBytes(random.nextLong()), - value); + p.add(Bytes.toBytes(familyName), Bytes.toBytes(random.nextLong()), value); } ht.put(p); } @@ -257,6 +381,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { traceScope.close(); } } + ht.close(); admin.flush(Bytes.toBytes(tableName)); return rowKeys; } @@ -277,10 +402,101 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { return this.util; } - private void setupReceiver() { - Configuration conf = new Configuration(util.getConfiguration()); - conf.setBoolean("hbase.zipkin.is-in-client-mode", true); + // Copy from htrace but includes the newline so parsing isn't absurdly complicated. + private static class NewlineLocalFileSpanReceiver implements SpanReceiver { + // default capacity for the executors blocking queue + public static final int DEFAULT_CAPACITY = 5000; + // default timeout duration when calling executor.awaitTermination() + public static final long DEFAULT_EXECUTOR_TERMINATION_TIMEOUT_DURATION = 60; + // default time unit for the above duration when calling + // executor.awaitTermination() + public static final Log LOG = LogFactory.getLog(NewlineLocalFileSpanReceiver.class); + private String file; + private FileWriter fwriter; + private BufferedWriter bwriter; + private Map values; + private ExecutorService executor; + private long executorTerminationTimeoutDuration; + + public NewlineLocalFileSpanReceiver() { + } + + + @Override + public void configure(HTraceConfiguration conf) { + this.executorTerminationTimeoutDuration = DEFAULT_EXECUTOR_TERMINATION_TIMEOUT_DURATION; + int capacity = conf.getInt("local-file-span-receiver.capacity", DEFAULT_CAPACITY); + this.file = conf.get("local-file-span-receiver.path"); + if (file == null || file.isEmpty()) { + throw new IllegalArgumentException("must configure " + file); + } + this.executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue(capacity)); + try { + this.fwriter = new FileWriter(this.file, true); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + this.bwriter = new BufferedWriter(fwriter); + this.values = new HashMap(); + } + + + private class WriteSpanRunnable implements Runnable { + public final Span span; + + public WriteSpanRunnable(Span span) { + this.span = span; + } + + @Override + public void run() { + try { + values.put("SpanID", span.getSpanId()); + values.put("TraceID", span.getTraceId()); + values.put("ParentID", span.getParentId()); + values.put("Start", span.getStartTimeMillis()); + values.put("Stop", span.getStopTimeMillis()); + values.put("Description", span.getDescription()); + values.put("Annotations", span.getKVAnnotations()); + bwriter.write(JSON.toString(values)); + bwriter.newLine(); + bwriter.flush(); + values.clear(); + } catch (IOException e) { + LOG.error("Error when writing to file: " + file, e); + } + } + } + + @Override + public void receiveSpan(Span span) { + executor.submit(new WriteSpanRunnable(span)); + } - this.receiverHost = SpanReceiverHost.getInstance(conf); + @Override + public void close() throws IOException { + executor.shutdown(); + try { + if (!executor.awaitTermination(this.executorTerminationTimeoutDuration, + TimeUnit.SECONDS)) { + LOG.warn("Was not able to process all remaining spans to write upon closing in: " + + this.executorTerminationTimeoutDuration + "s"); + } + } catch (InterruptedException e1) { + LOG.warn("Thread interrupted when terminating executor.", e1); + } + + try { + fwriter.close(); + } catch (IOException e) { + LOG.error("Error closing filewriter for file: " + file, e); + } + try { + bwriter.close(); + } catch (IOException e) { + LOG.error("Error closing bufferedwriter for file: " + file, e); + } + } } } -- 2.1.2