From d76f2bdd98d13b69b73e4a028f0f77e3ccc07dbe Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 9 Apr 2015 23:24:27 -0400 Subject: [PATCH] HBASE-13078 Validate trace spans in test. Use the LocalFileSpanReceiver to store spans, move the test in with the rest of the tests in hbase-server, and use the default TestingUtil to start the hbase instance. --- .../trace/IntegrationTestSendTraceRequests.java | 285 ----------------- .../hadoop/hbase/trace/TestSendTraceRequests.java | 339 +++++++++++++++++++++ 2 files changed, 339 insertions(+), 285 deletions(-) delete mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestSendTraceRequests.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java deleted file mode 100644 index 3fa8a9c..0000000 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.trace; - -import org.apache.commons.cli.CommandLine; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.AbstractHBaseTool; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.ToolRunner; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -@Category(IntegrationTests.class) -public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { - - public static final String TABLE_ARG = "t"; - public static final String CF_ARG = "f"; - - public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; - public static final String COLUMN_FAMILY_DEFAULT = "D"; - private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); - private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); - private IntegrationTestingUtility util; - private Random random = new Random(); - private Admin admin; - private SpanReceiverHost receiverHost; - - public static void main(String[] args) throws Exception { - Configuration configuration = HBaseConfiguration.create(); - IntegrationTestingUtility.setUseDistributedCluster(configuration); - IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); - ToolRunner.run(configuration, tool, args); - } - - @Override - protected void addOptions() { - addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already."); - addOptWithArg(CF_ARG, "The family to target"); - } - - @Override - public void processOptions(CommandLine cmd) { - String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT); - String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT); - - this.tableName = TableName.valueOf(tableNameString); - this.familyName = Bytes.toBytes(familyString); - } - - @Override - public int doWork() throws Exception { - internalDoWork(); - return 0; - } - - @Test - public void internalDoWork() throws Exception { - util = createUtil(); - admin = util.getHBaseAdmin(); - setupReceiver(); - - deleteTable(); - createTable(); - LinkedBlockingQueue rks = insertData(); - - ExecutorService service = Executors.newFixedThreadPool(20); - doScans(service, rks); - doGets(service, rks); - - service.shutdown(); - service.awaitTermination(100, TimeUnit.SECONDS); - Thread.sleep(90000); - receiverHost.closeReceivers(); - util.restoreCluster(); - util = null; - } - - private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { - - for (int i = 0; i < 100; i++) { - Runnable runnable = new Runnable() { - private TraceScope innerScope = null; - private final LinkedBlockingQueue rowKeyQueue = rks; - @Override - public void run() { - ResultScanner rs = null; - try { - innerScope = Trace.startSpan("Scan", Sampler.ALWAYS); - Table ht = util.getConnection().getTable(tableName); - Scan s = new Scan(); - s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); - s.setBatch(7); - rs = ht.getScanner(s); - // Something to keep the jvm from removing the loop. - long accum = 0; - - for(int x = 0; x < 1000; x++) { - Result r = rs.next(); - accum |= Bytes.toLong(r.getRow()); - } - - innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum); - - ht.close(); - ht = null; - } catch (IOException e) { - e.printStackTrace(); - - innerScope.getSpan().addKVAnnotation( - Bytes.toBytes("exception"), - Bytes.toBytes(e.getClass().getSimpleName())); - - } catch (Exception e) { - } finally { - if (innerScope != null) innerScope.close(); - if (rs != null) rs.close(); - } - - } - }; - service.submit(runnable); - } - - } - - private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) - throws IOException { - for (int i = 0; i < 100; i++) { - Runnable runnable = new Runnable() { - private TraceScope innerScope = null; - private final LinkedBlockingQueue rowKeyQueue = rowKeys; - - @Override - public void run() { - - - Table ht = null; - try { - ht = util.getConnection().getTable(tableName); - } catch (IOException e) { - e.printStackTrace(); - } - - long accum = 0; - for (int x = 0; x < 5; x++) { - try { - innerScope = Trace.startSpan("gets", Sampler.ALWAYS); - long rk = rowKeyQueue.take(); - Result r1 = ht.get(new Get(Bytes.toBytes(rk))); - if (r1 != null) { - accum |= Bytes.toLong(r1.getRow()); - } - Result r2 = ht.get(new Get(Bytes.toBytes(rk))); - if (r2 != null) { - accum |= Bytes.toLong(r2.getRow()); - } - innerScope.getSpan().addTimelineAnnotation("Accum = " + accum); - - } catch (IOException e) { - // IGNORED - } catch (InterruptedException ie) { - // IGNORED - } finally { - if (innerScope != null) innerScope.close(); - } - } - - } - }; - service.submit(runnable); - } - } - - private void createTable() throws IOException { - TraceScope createScope = null; - try { - createScope = Trace.startSpan("createTable", Sampler.ALWAYS); - util.createTable(tableName, familyName); - } finally { - if (createScope != null) createScope.close(); - } - } - - private void deleteTable() throws IOException { - TraceScope deleteScope = null; - - try { - if (admin.tableExists(tableName)) { - deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS); - util.deleteTable(tableName); - } - } finally { - if (deleteScope != null) deleteScope.close(); - } - } - - private LinkedBlockingQueue insertData() throws IOException, InterruptedException { - LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); - BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); - byte[] value = new byte[300]; - for (int x = 0; x < 5000; x++) { - TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); - try { - for (int i = 0; i < 5; i++) { - long rk = random.nextLong(); - rowKeys.add(rk); - Put p = new Put(Bytes.toBytes(rk)); - for (int y = 0; y < 10; y++) { - random.nextBytes(value); - p.add(familyName, Bytes.toBytes(random.nextLong()), value); - } - ht.mutate(p); - } - if ((x % 1000) == 0) { - admin.flush(tableName); - } - } finally { - traceScope.close(); - } - } - admin.flush(tableName); - return rowKeys; - } - - private IntegrationTestingUtility createUtil() throws Exception { - Configuration conf = getConf(); - if (this.util == null) { - IntegrationTestingUtility u; - if (conf == null) { - u = new IntegrationTestingUtility(); - } else { - u = new IntegrationTestingUtility(conf); - } - util = u; - util.initializeCluster(1); - - } - return this.util; - } - - private void setupReceiver() { - Configuration conf = new Configuration(util.getConfiguration()); - conf.setBoolean("hbase.zipkin.is-in-client-mode", true); - - this.receiverHost = SpanReceiverHost.getInstance(conf); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestSendTraceRequests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestSendTraceRequests.java new file mode 100644 index 0000000..61eaccf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestSendTraceRequests.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.apache.htrace.impl.LocalFileSpanReceiver; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test that spans are created as expected + */ +@Category(MediumTests.class) +public class TestSendTraceRequests { + private static final Log LOG = LogFactory.getLog(TestSendTraceRequests.class); + + public static final String TABLE_ARG = "t"; + public static final String CF_ARG = "f"; + public static final String COLUMN_FAMILY_DEFAULT = "D"; + + private static final int NUM_PUTS = 5000; + private static final int NUM_SCANS = 100; + private static final int NUM_GETS = 100; + private static final int GETS_PER_TASK = 5; + private static final String SCANS = "scans"; + private static final String GETS = "gets"; + private static final String CREATE_TABLE = "createTable"; + private static final String INSERT_DATA = "insertData"; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void before() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void after() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private TableName tableName = TableName.valueOf("SendTracesTable"); + private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); + private Random random = new Random(); + private Admin admin; + + @Test + public void test() throws Exception { + String hbaseItModuleDir = System.getProperty("user.dir"); + // Sanity check + assertNotNull(hbaseItModuleDir); + File target = new File(hbaseItModuleDir, "target"); + final File traceFile = new File(target, "TestSendTraceRequests_traces.txt"); + if (traceFile.exists()) { + // Clear out the file if it exists already + traceFile.delete(); + // Try to clean up the one that will be created + traceFile.deleteOnExit(); + } + + Configuration conf = new Configuration(false); + conf.set("hbase.local-file-span-receiver.path", traceFile.getAbsolutePath()); + LocalFileSpanReceiver rcvr = new LocalFileSpanReceiver(new HBaseHTraceConfiguration(conf)); + Trace.addReceiver(rcvr); + + try { + admin = TEST_UTIL.getHBaseAdmin(); + + createTable(); + LinkedBlockingQueue rks = insertData(); + + ExecutorService service = Executors.newFixedThreadPool(20); + doScans(service, rks); + doGets(service, rks); + + service.shutdown(); + while (!service.awaitTermination(5, TimeUnit.SECONDS)) { + // Still waiting for shutdown + LOG.debug("Still waiting for thread pool to stop"); + } + LOG.debug("Thread pool stopped"); + + // Stop the span receiver + rcvr.close(); + + assertTrue("File containing spans does not exist: " + traceFile, traceFile.exists()); + Map traces = readFileSpans(traceFile); + verifyTraces(traces); + } finally { + Trace.removeReceiver(rcvr); + rcvr = null; + } + } + + /** + * Read spans serialized to a file by LocalFileSpanReceiver. + * @param traceFile File spans are written to + * @return Map of span description to frequency + */ + private Map readFileSpans(File traceFile) throws IOException { + Map traces = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(traceFile))) { + String line = null; + while ((line = reader.readLine()) != null) { + String[] parts = StringUtils.split(line, ','); + if (parts.length < 5) { + // unknown line + continue; + } + String spanNameComponent = parts[4]; + parts = StringUtils.split(spanNameComponent,':'); + assertTrue("Unable to parse into two pieces: " + spanNameComponent, parts.length == 2); + String spanName = parts[1]; + spanName = spanName.substring(1, spanName.length() - 1); + AtomicInteger counter = traces.get(spanName); + if (null == counter) { + counter = new AtomicInteger(0); + traces.put(spanName, counter); + } + counter.incrementAndGet(); + } + } + return traces; + } + + /** + * Verify the expected number of spans were seen + * @param traces Map of Span description to frequency + */ + private void verifyTraces(Map traces) { + assertEquals("Found unexpected number of scan spans", NUM_SCANS, traces.get(SCANS).get()); + assertEquals("Found unexpected number of put spans", NUM_PUTS, traces.get(INSERT_DATA).get()); + assertEquals("Found unexpected number of get spans", NUM_GETS * GETS_PER_TASK, traces.get(GETS) + .get()); + assertEquals("Found unexpected number of create table spans", 1, traces.get(CREATE_TABLE).get()); + } + + /** + * Queue scans against the provided row keys + * @param service thread pool + * @param rks row keys + */ + private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + for (int i = 0; i < NUM_SCANS; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rks; + + @Override + public void run() { + ResultScanner rs = null; + try { + innerScope = Trace.startSpan(SCANS, Sampler.ALWAYS); + Table ht = TEST_UTIL.getConnection().getTable(tableName); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); + s.setBatch(7); + rs = ht.getScanner(s); + // Something to keep the jvm from removing the loop. + long accum = 0; + + for (int x = 0; x < 1000; x++) { + Result r = rs.next(); + accum |= Bytes.toLong(r.getRow()); + } + + innerScope.getSpan().addTimelineAnnotation(" = " + accum); + + ht.close(); + ht = null; + } catch (IOException e) { + e.printStackTrace(); + + innerScope.getSpan().addKVAnnotation(Bytes.toBytes("exception"), + Bytes.toBytes(e.getClass().getSimpleName())); + + } catch (Exception e) {} finally { + if (innerScope != null) + innerScope.close(); + if (rs != null) + rs.close(); + } + + } + }; + service.submit(runnable); + } + } + + /** + * Queue a number of GETS against the provided rowkeys + * @param service thread pool + * @param rowKeys row keys + */ + private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) + throws IOException { + for (int i = 0; i < NUM_GETS; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rowKeys; + + @Override + public void run() { + + Table ht = null; + try { + ht = TEST_UTIL.getConnection().getTable(tableName); + } catch (IOException e) { + e.printStackTrace(); + } + + long accum = 0; + for (int x = 0; x < GETS_PER_TASK; x++) { + try { + innerScope = Trace.startSpan(GETS, Sampler.ALWAYS); + long rk = rowKeyQueue.take(); + Result r1 = ht.get(new Get(Bytes.toBytes(rk))); + if (r1 != null) { + accum |= Bytes.toLong(r1.getRow()); + } + Result r2 = ht.get(new Get(Bytes.toBytes(rk))); + if (r2 != null) { + accum |= Bytes.toLong(r2.getRow()); + } + innerScope.getSpan().addTimelineAnnotation("Accum = " + accum); + + } catch (IOException e) { + // IGNORED + } catch (InterruptedException ie) { + // IGNORED + } finally { + if (innerScope != null) + innerScope.close(); + } + } + + } + }; + service.submit(runnable); + } + } + + /** + * Create a table in a span + */ + private void createTable() throws IOException { + TraceScope createScope = null; + try { + createScope = Trace.startSpan(CREATE_TABLE, Sampler.ALWAYS); + TEST_UTIL.createTable(tableName, familyName); + } finally { + if (createScope != null) + createScope.close(); + } + } + + /** + * Write some data inside spans + * @return The rowkeys + */ + private LinkedBlockingQueue insertData() throws IOException, InterruptedException { + LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); + BufferedMutator ht = TEST_UTIL.getConnection().getBufferedMutator(this.tableName); + byte[] value = new byte[300]; + for (int x = 0; x < NUM_PUTS; x++) { + TraceScope traceScope = Trace.startSpan(INSERT_DATA, Sampler.ALWAYS); + try { + for (int i = 0; i < 5; i++) { + long rk = random.nextLong(); + rowKeys.add(rk); + Put p = new Put(Bytes.toBytes(rk)); + for (int y = 0; y < 10; y++) { + random.nextBytes(value); + p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value); + } + ht.mutate(p); + } + if ((x % 1000) == 0) { + admin.flush(tableName); + } + } finally { + traceScope.close(); + } + } + admin.flush(tableName); + return rowKeys; + } +} -- 2.1.2