diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncGetCallback.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncGetCallback.java new file mode 100644 index 0000000..e6aab18 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncGetCallback.java @@ -0,0 +1,87 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Facility class for easily customizing a get callback + * @param + */ +public abstract class AsyncGetCallback implements AsyncRpcCallback { + + protected final String row; + private CellScanner cells; + + public AsyncGetCallback(byte[] row) { + this.row = Bytes.toString(row); + } + + @Override + public void run(GetResponse response) { + Result result; + try { + result = ProtobufUtil.toResult(response.getResult(), cells); + processResult(result); + } catch (IOException e) { + processError(e); + } + } + + @Override + public void onError(Throwable exception) { + processError(exception); + } + + abstract public void processResult(Result result); + + abstract public void processError(Throwable exception); + + @Override + public String toString() { + return "AsyncGetCallback_" + row; + } + + @Override + public int hashCode() { + return this.row.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof AsyncGetCallback)) { + return false; + } + if (obj == this) { + return true; + } + return ((AsyncGetCallback) obj).row.equals(this.row); + } + + @Override + public void setCellScanner(CellScanner cells) { + this.cells = cells; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcCallback.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcCallback.java new file mode 100644 index 0000000..15762d4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcCallback.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.CellScanner; + +import com.google.protobuf.RpcCallback; + +public interface AsyncRpcCallback extends RpcCallback { + /** + * Called when the whole call is completed successfully + * @param result the return result of remote call + */ + @Override + public void run(T result); + + /** + * Called when the whole call is failed with given exception + */ + public void onError(Throwable exception); + + public void setCellScanner(CellScanner cells); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 5c70b77..5ef18d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -247,6 +247,15 @@ public interface ClusterConnection extends Connection { ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** + * Establishes a connection to the region server at the specified address, and returns an + * non-blocking region client protocol. + * @param serverName + * @return Non-blocking proxy for RegionServer + * @throws IOException if a remote or network exception occurs + */ + ClientService.Interface getAsyncClient(final ServerName serverName) throws IOException; + + /** * Find region location hosting passed row * @param tableName table name * @param row Row to find. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 38178b4..9255aad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -93,6 +93,7 @@ import org.apache.zookeeper.KeeperException; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -1214,6 +1215,31 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub; } + @Override + public ClientProtos.ClientService.Interface getAsyncClient(final ServerName sn) + throws IOException { + if (isDeadServer(sn)) { + throw new RegionServerStoppedException(sn + " is dead."); + } + // The service name is different from blocking interface so there's no conflict + String key = + getStubKey(ClientProtos.ClientService.Interface.class.getName(), sn.getHostname(), + sn.getPort(), this.hostnamesCanChange); + this.connectionLock.putIfAbsent(key, key); + ClientProtos.ClientService.Interface asyncStub = null; + synchronized (this.connectionLock.get(key)) { + asyncStub = (ClientProtos.ClientService.Interface) this.stubs.get(key); + if (asyncStub == null) { + RpcChannel channel = this.rpcClient.createRpcChannel(sn, user, rpcTimeout); + asyncStub = ClientProtos.ClientService.newStub(channel); + // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. + // Just fail on first actual call rather than in here on setup. + this.stubs.put(key, asyncStub); + } + } + return asyncStub; + } + static String getStubKey(final String serviceName, final String rsHostname, int port, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1d1db3a..984ff20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -49,12 +49,14 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; @@ -70,6 +72,7 @@ import com.google.common.annotations.VisibleForTesting; // Internally, we use shaded protobuf. This below are part of our public API. import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; // SEE ABOVE NOTE! @@ -407,10 +410,11 @@ public class HTable implements Table { */ @Override public Result get(final Get get) throws IOException { - return get(get, get.isCheckExistenceOnly()); + return get(get, get.isCheckExistenceOnly(), null); } - private Result get(Get get, final boolean checkExistenceOnly) throws IOException { + private Result get(Get get, final boolean checkExistenceOnly, + final AsyncRpcCallback callback) throws IOException { // if we are changing settings to the get, clone it. if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { get = ReflectionUtils.newInstance(get.getClass(), get); @@ -429,7 +433,25 @@ public class HTable implements Table { protected Result rpcCall() throws Exception { ClientProtos.GetRequest request = RequestConverter.buildGetRequest( getLocation().getRegionInfo().getRegionName(), configuredGet); - ClientProtos.GetResponse response = getStub().get(getRpcController(), request); + ClientProtos.GetResponse response = null; + if (callback == null) { + response = getStub().get(getRpcController(), request); + } else { + final HBaseRpcController controller = (HBaseRpcController) getRpcController(); + getAsyncStub().get(getRpcController(), request, new RpcCallback() { + + @Override + public void run(GetResponse result) { + if (controller.failed()) { + IOException exception = controller.getFailed(); + callback.onError(exception); + } else { + callback.setCellScanner(controller.cellScanner()); + callback.run(result); + } + } + }); + } if (response == null) return null; return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); } @@ -849,7 +871,7 @@ public class HTable implements Table { */ @Override public boolean exists(final Get get) throws IOException { - Result r = get(get, true); + Result r = get(get, true, null); assert r.getExists() != null; return r.getExists(); } @@ -1259,4 +1281,9 @@ public class HTable implements Table { } return mutator; } + + @Override + public void asyncGet(Get get, AsyncRpcCallback callback) throws IOException { + get(get, get.isCheckExistenceOnly(), callback); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 6a02e18..cc432d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -47,6 +47,7 @@ import com.google.protobuf.RpcController; @InterfaceAudience.Private public abstract class RegionServerCallable extends AbstractRegionServerCallable { private ClientService.BlockingInterface stub; + private ClientService.Interface asyncStub; /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is @@ -74,6 +75,7 @@ public abstract class RegionServerCallable extends AbstractRegionServerCallab void setClientByServiceName(ServerName service) throws IOException { this.setStub(getConnection().getClient(service)); + this.setAsyncStub(getConnection().getAsyncClient(this.location.getServerName())); } /** @@ -92,6 +94,21 @@ public abstract class RegionServerCallable extends AbstractRegionServerCallab } /** + * @return Client Rpc protobuf non-blocking communication stub + */ + protected ClientService.Interface getAsyncStub() { + return this.asyncStub; + } + + /** + * Set the client protobuf non-blocking communication stub + * @param asyncStub The non-blocking stub to set + */ + void setAsyncStub(final ClientService.Interface asyncStub) { + this.asyncStub = asyncStub; + } + + /** * Override that changes call Exception from {@link Exception} to {@link IOException}. It also * does setup of an rpcController and calls through to the unimplemented * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 4d93442..6af7591 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; @@ -637,4 +638,17 @@ public interface Table extends Closeable { * @param writeRpcTimeout */ void setWriteRpcTimeout(int writeRpcTimeout); + + /** + * Extracts certain cells from a given row. + *

+ * It is an async operation + *

+ * You should define the action for result or error in given callback. + * @param get The object that specifies what data to fetch and from which row. + * @param callback define how to handle the remote response + * @throws IOException + */ + public void asyncGet(final Get get, final AsyncRpcCallback callback) + throws IOException; } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 5debf39..e216f3a 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AsyncRpcCallback; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.rest.Constants; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; @@ -898,4 +900,9 @@ public class RemoteHTable implements Table { throw new IllegalStateException("URLEncoder doesn't support UTF-8", e); } } + + @Override + public void asyncGet(Get get, AsyncRpcCallback callback) throws IOException { + throw new UnsupportedOperationException("asyncGet call currently not supported in REST call"); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 6a73261..270e2a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.io.MultipleIOException; /** @@ -330,4 +331,9 @@ public final class HTableWrapper implements Table { @Override public int getReadRpcTimeout() { return table.getReadRpcTimeout(); } + + @Override + public void asyncGet(Get get, AsyncRpcCallback callback) throws IOException { + this.table.asyncGet(get, callback); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncHTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncHTable.java new file mode 100644 index 0000000..154846b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncHTable.java @@ -0,0 +1,248 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.NettyRpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.Message; + +/** + * Class to test Async Call Method of HTable.Spins up the minicluster once at + * test start and then takes it down afterward. + */ +@Category(LargeTests.class) +public class TestAsyncHTable { + final Log LOG = LogFactory.getLog(getClass()); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("cf"); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Connection connection; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 100); + // TEST_UTIL.getConfiguration().setInt("hbase.ipc.server.max.callqueue.length", 12800); + TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, + NettyRpcClient.class.getName()); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); + TEST_UTIL.startMiniCluster(3); + connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + connection.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + public static void resetConnection() throws Exception { + connection.close(); + connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + } + + @Test(timeout = 300000) + public void testCallBack() throws Exception { + final TableName table = TableName.valueOf("testCallBack"); + final byte[] qualifier = Bytes.toBytes("q"); + Table htable = + TEST_UTIL.createTable(table, COLUMN_FAMILY, + new byte[][] { Bytes.toBytes("thread2"), Bytes.toBytes("thread4") }); + int threadsNum = 5; + final int operationPerThread = 100; + int expectedOperationCount = threadsNum * operationPerThread; + final AtomicInteger successCount = new AtomicInteger(0); + final AtomicInteger errorCount = new AtomicInteger(0); + Thread[] opThreads = new Thread[threadsNum]; + + // Do put in multi threads + for (int i = 0; i < threadsNum; i++) { + final String threadPrefix = "thread" + i + "-"; + opThreads[i] = new AsyncOperationThread(table, threadPrefix, operationPerThread) { + + @Override + public void doOperation(Table ht, byte[] row) { + Put put = new Put(row); + put.addColumn(COLUMN_FAMILY, qualifier, row); + // try { + // ht.asyncPut(put, new CustomAsyncPutCallback(row)); + // } catch (IOException e) { + // errorCount.incrementAndGet(); + // throw new RuntimeException(e); + // } + // reserved sync way of put + try { + ht.put(put); + successCount.incrementAndGet(); + } catch (IOException e) { + errorCount.incrementAndGet(); + throw new RuntimeException(e); + } + } + }; + opThreads[i].start(); + } + for (Thread opThread : opThreads) { + opThread.join(); + } + waitUntilEqual(successCount, errorCount, expectedOperationCount, 60 * 1000); + LOG.debug("Succeed put operation number: " + successCount.get()); + LOG.debug("Failed put operation number: " + errorCount.get()); + assertEquals(successCount.get(), TEST_UTIL.countRows(htable)); + + // Reset counters + final AtomicInteger emptyResultCount = new AtomicInteger(0); + final AtomicInteger normalResultCount = new AtomicInteger(0); + int expectedEmptyResultCnt = errorCount.get(); + int expectedNormalResultCnt = successCount.get(); + successCount.set(0); + errorCount.set(0); + + // An example of customized call back for get + final class CustomAsyncGetCallback extends AsyncGetCallback { + public CustomAsyncGetCallback(byte[] row) { + super(row); + } + + @Override + public void processResult(Result result) { + LOG.debug("Receive one get result: " + result); + if (result.isEmpty()) { + int emptyCnt = emptyResultCount.incrementAndGet(); + LOG.warn("Got empty result, empty result count: " + emptyCnt); + } else { + int normalCnt = normalResultCount.incrementAndGet(); + LOG.warn("Got normal result, normal result count: " + normalCnt); + } + int opCnt = successCount.incrementAndGet(); + LOG.debug("Current finished get op count: " + opCnt); + } + + @Override + public void processError(Throwable exception) { + LOG.debug("Receive one get failure", exception); + int opCnt = errorCount.incrementAndGet(); + LOG.debug("Current failed get op count: " + opCnt); + } + + @Override + public String toString() { + return "CustomAsyncGetCallback_" + row; + } + } + + // Do get in multi threads + for (int i = 0; i < threadsNum; i++) { + final String threadPrefix = "thread" + i + "-"; + opThreads[i] = new AsyncOperationThread(table, threadPrefix, operationPerThread) { + + @Override + public void doOperation(Table ht, byte[] row) { + Get get = new Get(row); + CustomAsyncGetCallback getCallback = new CustomAsyncGetCallback(row); + try { + ht.asyncGet(get, getCallback); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + opThreads[i].start(); + } + for (Thread opThread : opThreads) { + opThread.join(); + } + + waitUntilEqual(successCount, errorCount, expectedOperationCount, 60 * 1000); + if (errorCount.get() == 0) { + Assert.assertEquals("Expected " + expectedEmptyResultCnt + " empty results but actually " + + emptyResultCount.get(), expectedEmptyResultCnt, emptyResultCount.get()); + Assert.assertEquals("Expected " + normalResultCount + " normal results but actually " + + normalResultCount.get(), expectedNormalResultCnt, normalResultCount.get()); + } + LOG.debug("Succeed get operation number: " + successCount.get()); + LOG.debug("Failed get operation number: " + errorCount.get()); + + htable.close(); + } + + private void waitUntilEqual(AtomicInteger successCount, AtomicInteger failureCount, + int expectedCount, int timeout) throws IOException { + long start = System.currentTimeMillis(); + while (successCount.get() + failureCount.get() != expectedCount) { + Threads.sleep(10); + if (System.currentTimeMillis() - start > timeout) { + throw new IOException("Timeout waiting to " + expectedCount + ",now successCnt:" + + successCount.get() + "; failureCnt: " + failureCount); + } + } + } + + static abstract class AsyncOperationThread extends Thread { + private final TableName table; + private final String threadPrefix; + private final int opPerThread; + + public AsyncOperationThread(TableName table, String threadPrefix, int opPerThread) { + this.table = table; + this.threadPrefix = threadPrefix; + this.opPerThread = opPerThread; + } + + public abstract void doOperation(Table ht, byte[] row); + + public void run() { + Table ht = null; + try { + ht = connection.getTable(table); + for (int i = 0; i < opPerThread; i++) { + doOperation(ht, Bytes.toBytes(threadPrefix + i)); + } + } catch (IllegalArgumentException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + try { + if (ht != null) ht.close(); + } catch (IOException e) { + } + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index d2e78b7..f51a0ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AsyncRpcCallback; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; @@ -355,4 +357,11 @@ public class RegionAsTable implements Table { @Override public int getReadRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public void asyncGet(Get get, AsyncRpcCallback callback) throws IOException { + throw new UnsupportedOperationException("asyncGet call is not supported in " + + this.getClass().getName()); + + } } \ No newline at end of file