From f593fc0a8538e3c8ac4550f0a32665ffd20f8d3a Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 3 Nov 2015 13:01:10 -0800 Subject: [PATCH] HBASE-14752 Add example of using the HBase client in a multi-threaded environment --- .../client/example/MultiThreadedClientExample.java | 176 +++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java new file mode 100644 index 0000000..c0aa9f9 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java @@ -0,0 +1,176 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + + +public class MultiThreadedClientExample extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MultiThreadedClientExample.class); + public static final int NUM_OPERATIONS = 500000; + // d for default. + private static byte[] FAMILY = Bytes.toBytes("d"); + private static byte[] QUAL = Bytes.toBytes("test"); + + private final ExecutorService internalPool; + private final Connection readConnection; + private final Connection writeConnection; + private final int threads; + + public MultiThreadedClientExample() throws IOException { + this.threads = Runtime.getRuntime().availableProcessors() * 4; + this.internalPool = Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build()); + + // Threads for the client only. + // + // We don't want to mix hbase and business logic. + ExecutorService service = new ForkJoinPool(threads); + + + + // Create two different connections showing how it's possible to + // separate different types of requests onto different connections + writeConnection = ConnectionFactory.createConnection(HBaseConfiguration.create(), service); + readConnection = ConnectionFactory.createConnection(HBaseConfiguration.create(), service); + } + + @Override + public int run(String[] args) throws Exception { + final TableName tableName = TableName.valueOf(args[0]); + + // At this point the entire cache for the region locations is full. + warmUpConnectionCache(readConnection, tableName); + warmUpConnectionCache(writeConnection, tableName); + + List> futures = new ArrayList<>(NUM_OPERATIONS); + for (int i = 0; i < NUM_OPERATIONS; i++) { + if (i % 3 == 0) { + futures.add( + internalPool.submit(new ReadExampleCallable(readConnection, tableName)) + ); + } else { + futures.add( + internalPool.submit(new WriteExampleCallable(writeConnection, tableName)) + ); + } + } + + for (Future f : futures) { + f.get(5, TimeUnit.MINUTES); + } + return 0; + } + + private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { + try (RegionLocator locator = connection.getRegionLocator(tn)) { + LOG.info( + "Warmed up region location cache for " + tn + + " got " + locator.getAllRegionLocations().size()); + } + } + + public static class WriteExampleCallable implements Callable { + private final Connection connection; + private final TableName tableName; + + public WriteExampleCallable(Connection connection, TableName tableName) { + this.connection = connection; + this.tableName = tableName; + } + + @Override + public Boolean call() throws Exception { + try (Table t = connection.getTable(tableName)) { + byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + Put p = new Put(rk); + p.addImmutable(FAMILY, QUAL, Bytes.toBytes(ThreadLocalRandom.current().nextBoolean())); + t.put(p); + } + + return false; + } + } + + public static class ReadExampleCallable implements Callable { + private final Connection connection; + private final TableName tableName; + + public ReadExampleCallable(Connection connection, TableName tableName) { + this.connection = connection; + this.tableName = tableName; + } + + @Override + public Boolean call() throws Exception { + int result = 0; + int toRead = 100; + try (Table t = connection.getTable(tableName)) { + byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + Scan s = new Scan(rk); + s.setCaching(20); + s.setCacheBlocks(false); + + try (ResultScanner rs = t.getScanner(s)) { + for (Result r : rs) { + result += r.getRow().length; + toRead -= 1; + if (toRead <= 0) { + break; + } + } + } + } + return result > 0; + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new MultiThreadedClientExample(), args); + } + + +} -- 2.6.1