From cd4afe7a2c6dc31e452610a0f46ed0cc013c33fa Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 3 Nov 2015 13:01:10 -0800 Subject: [PATCH] HBASE-14752 Add example of using the HBase client in a multi-threaded environment --- .../client/example/MultiThreadedClientExample.java | 231 +++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java new file mode 100644 index 0000000..d3ce0c9 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java @@ -0,0 +1,231 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + + +/** + * Example on how to use HBase's {@link Connection} in a multi-threaded environment. + * + * Spins up a pretty reasonable amount of threads. + * Does 50% writes and 50% reads. (Approximately) + */ +public class MultiThreadedClientExample extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MultiThreadedClientExample.class); + public static final int NUM_OPERATIONS = 500000; + // d for default. + private static byte[] FAMILY = Bytes.toBytes("d"); + private static byte[] QUAL = Bytes.toBytes("test"); + + private final ExecutorService internalPool; + + private final int threads; + + public MultiThreadedClientExample() throws IOException { + this.threads = Runtime.getRuntime().availableProcessors() * 4; + this.internalPool = Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build()); + } + + @Override + public int run(String[] args) throws Exception { + final TableName tableName = TableName.valueOf(args[0]); + + // Threads for the client only. + // + // We don't want to mix hbase and business logic. + // + ExecutorService service = new ForkJoinPool(threads * 2); + + // Create two different connections showing how it's possible to + // separate different types of requests onto different connections + final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service); + final Connection readConnection = ConnectionFactory.createConnection(getConf(), service); + + // At this point the entire cache for the region locations is full. + warmUpConnectionCache(readConnection, tableName); + warmUpConnectionCache(writeConnection, tableName); + + List> futures = new ArrayList<>(NUM_OPERATIONS); + for (int i = 0; i < NUM_OPERATIONS; i++) { + double r = ThreadLocalRandom.current().nextDouble(); + Future f = null; + if (r < .30) { + f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName)); + } else if (r < .50) { + f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName)); + } else { + f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName)); + } + if (f != null) { + futures.add(f); + } + } + + // Wait a long time for all the reads/writes to complete + for (Future f : futures) { + f.get(10, TimeUnit.MINUTES); + } + + // Clean up after our selves for cleanliness + //internalPool.shutdownNow(); + //service.shutdownNow(); + return 0; + } + + private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { + try (RegionLocator locator = connection.getRegionLocator(tn)) { + LOG.info( + "Warmed up region location cache for " + tn + + " got " + locator.getAllRegionLocations().size()); + } + } + + /** + * Class that will show how to send batches of puts at the same time. + */ + public static class WriteExampleCallable implements Callable { + private final Connection connection; + private final TableName tableName; + + public WriteExampleCallable(Connection connection, TableName tableName) { + this.connection = connection; + this.tableName = tableName; + } + + @Override + public Boolean call() throws Exception { + try (Table t = connection.getTable(tableName)) { + byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); + int rows = 30; + + // Array to put the batch + ArrayList puts = new ArrayList<>(rows); + for (int i = 0; i < 30; i++) { + byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + Put p = new Put(rk); + p.addImmutable(FAMILY, QUAL, value); + puts.add(p); + } + + // now that we've assembled the batch it's time to push it to hbase. + t.put(puts); + } + return true; + } + } + + public static class SingleWriteExampleCallable implements Callable { + private final Connection connection; + private final TableName tableName; + + public SingleWriteExampleCallable(Connection connection, TableName tableName) { + this.connection = connection; + this.tableName = tableName; + } + + @Override + public Boolean call() throws Exception { + try (Table t = connection.getTable(tableName)) { + + byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); + byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + Put p = new Put(rk); + p.addImmutable(FAMILY, QUAL, value); + t.put(p); + } + return true; + } + } + + + public static class ReadExampleCallable implements Callable { + private final Connection connection; + private final TableName tableName; + + public ReadExampleCallable(Connection connection, TableName tableName) { + this.connection = connection; + this.tableName = tableName; + } + + @Override + public Boolean call() throws Exception { + int result = 0; + int toRead = 100; + try (Table t = connection.getTable(tableName)) { + byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + Scan s = new Scan(rk); + + // Don't go back to the server for every single row. + // We know these rows are small. So ask for 20 at a time. + s.setCaching(20); + + // Don't use the cache. While this is a silly test program it's still good to be + // explicit that scans normally don't use the block cache. + s.setCacheBlocks(false); + + // Open up the scanner and close it automatically when done. + try (ResultScanner rs = t.getScanner(s)) { + + // Now go through rows. + for (Result r : rs) { + // Keep track of things size to simulate doing some real work. + result += r.getRow().length; + toRead -= 1; + if (toRead <= 0) { + break; + } + } + } + } + return result > 0; + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new MultiThreadedClientExample(), args); + } +} -- 2.6.1