From e899c7cf711d151acbcdefe8634aa9057ae094dd Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 19 Jan 2017 11:50:53 +0800 Subject: [PATCH] HBASE-17346 Add coprocessor service support --- .../client/ClientCoprocessorRpcController.java | 74 +++++++++++++ .../apache/hadoop/hbase/client/RawAsyncTable.java | 41 ++++++++ .../hadoop/hbase/client/RawAsyncTableImpl.java | 98 +++++++++++++++++ .../client/RegionCoprocessorRpcChannelImpl.java | 117 +++++++++++++++++++++ .../client/coprocessor/AggregationClient.java | 88 ++-------------- .../client/coprocessor/AggregationHelper.java | 109 +++++++++++++++++++ .../client/coprocessor/AsyncAggregationClient.java | 117 +++++++++++++++++++++ .../hbase/coprocessor/AggregateImplementation.java | 17 +-- .../hbase/client/TestAsyncAggregationClient.java | 88 ++++++++++++++++ 9 files changed, 662 insertions(+), 87 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java create mode 100644 hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java create mode 100644 hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java new file mode 100644 index 0000000..149e1d3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Client side rpc controller for coprocessor implementation. It is only used to pass error. + */ +@InterfaceAudience.Private +public class ClientCoprocessorRpcController implements RpcController { + + private Throwable error; + + @Override + public void reset() { + } + + @Override + public boolean failed() { + return error != null; + } + + @Override + public String errorText() { + return error != null ? error.getMessage() : null; + } + + @Override + public void startCancel() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFailed(String reason) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + throw new UnsupportedOperationException(); + } + + public void setFailed(Throwable error) { + this.error = error; + } + + public Throwable getFailed() { + return error; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 67099e8..14d9990 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -60,4 +68,37 @@ public interface RawAsyncTable extends AsyncTableBase { * @param consumer the consumer used to receive results. */ void scan(Scan scan, RawScanResultConsumer consumer); + + @InterfaceAudience.Public + @InterfaceStability.Unstable + @FunctionalInterface + interface CoprocessorCallable { + void call(S stub, RpcController controller, RpcCallback done); + } + + CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] row); + + @InterfaceAudience.Public + @InterfaceStability.Unstable + interface CoprocessorCallback { + + void onRegionComplete(HRegionInfo region, R resp); + + void onRegionError(HRegionInfo region, Throwable error); + + void onComplete(); + + void onError(Throwable error); + } + + default void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, byte[] endKey, + CoprocessorCallback callback) { + coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback); + } + + void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, + boolean endKeyInclusive, CoprocessorCallback callback); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index d9d2d35..ab4e2d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -18,17 +18,27 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; + +import com.google.protobuf.RpcChannel; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -388,6 +398,7 @@ class RawAsyncTableImpl implements RawAsyncTable { public List> put(List puts) { return voidMutate(puts); } + @Override public List> delete(List deletes) { return voidMutate(deletes); @@ -434,4 +445,91 @@ class RawAsyncTableImpl implements RawAsyncTable { public long getScanTimeout(TimeUnit unit) { return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } + + private CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, HRegionInfo region, byte[] row) { + RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, + region, row, rpcTimeoutNs, operationTimeoutNs); + S stub = stubMaker.apply(channel); + CompletableFuture future = new CompletableFuture<>(); + ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); + callable.call(stub, controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + return future; + } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] row) { + return coprocessorService(stubMaker, callable, null, row); + } + + private boolean locateFinished(HRegionInfo region, byte[] endKey, boolean endKeyInclusive) { + if (isEmptyStopRow(endKey)) { + if (isEmptyStopRow(region.getEndKey())) { + return true; + } + return false; + } else { + if (isEmptyStopRow(region.getEndKey())) { + return true; + } + int c = Bytes.compareTo(endKey, region.getEndKey()); + // 1. if the region contains endKey + // 2. endKey is equal to the region's endKey and we do not want to include endKey. + return c < 0 || c == 0 && !endKeyInclusive; + } + } + + private void onLocateComplete(Function stubMaker, + CoprocessorCallable callable, CoprocessorCallback callback, + List locs, byte[] endKey, boolean endKeyInclusive, + AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, + Throwable error) { + if (error != null) { + callback.onError(error); + return; + } + unfinishedRequest.incrementAndGet(); + HRegionInfo region = loc.getRegionInfo(); + if (locateFinished(region, endKey, endKeyInclusive)) { + locateFinished.set(true); + } else { + conn.getLocator() + .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, + operationTimeoutNs) + .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, + endKeyInclusive, locateFinished, unfinishedRequest, l, e)); + } + coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { + if (e != null) { + callback.onRegionError(region, e); + } else { + callback.onRegionComplete(region, r); + } + if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { + callback.onComplete(); + } + }); + } + + @Override + public void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, + boolean endKeyInclusive, CoprocessorCallback callback) { + byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW); + byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW); + List locs = new ArrayList<>(); + conn.getLocator() + .getRegionLocation(tableName, nonNullStartKey, + startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) + .whenComplete( + (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java new file mode 100644 index 0000000..28a5564 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The implementation of a region based coprocessor rpc channel. + */ +@InterfaceAudience.Private +class RegionCoprocessorRpcChannelImpl implements RpcChannel { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final HRegionInfo region; + + private final byte[] row; + + private final long rpcTimeoutNs; + + private final long operationTimeoutNs; + + RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, HRegionInfo region, + byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { + this.conn = conn; + this.tableName = tableName; + this.region = region; + this.row = row; + this.rpcTimeoutNs = rpcTimeoutNs; + this.operationTimeoutNs = operationTimeoutNs; + } + + private CompletableFuture rpcCall(MethodDescriptor method, Message request, + Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, + ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + if (region != null + && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) { + future.completeExceptionally(new DoNotRetryIOException( + "Region name is changed, expected " + region.getRegionNameAsString() + ", actual " + + loc.getRegionInfo().getRegionNameAsString())); + return future; + } + CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, + request, row, loc.getRegionInfo().getRegionName()); + stub.execService(controller, csr, + new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() { + + @Override + public void run(CoprocessorServiceResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + @Override + public void callMethod(MethodDescriptor method, RpcController controller, Message request, + Message responsePrototype, RpcCallback done) { + conn.callerFactory. single().table(tableName).row(row) + .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call() + .whenComplete((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + }); + } + +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index d236342..f96377f 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hbase.client.coprocessor; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -49,18 +53,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - /** * This client class is for invoking the aggregate functions deployed on the * Region Server side via the AggregateService. This class will implement the @@ -227,23 +225,7 @@ public class AggregationClient implements Closeable { return aMaxCallBack.getMax(); } - /* - * @param scan - * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan - */ - private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { - if (scan == null - || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals( - scan.getStartRow(), HConstants.EMPTY_START_ROW)) - || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals( - scan.getStopRow(), HConstants.EMPTY_END_ROW))) { - throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); - } else if (!canFamilyBeAbsent) { - if (scan.getFamilyMap().size() != 1) { - throw new IOException("There must be only one family."); - } - } - } + /** * It gives the minimum value of a column for a given column family for the @@ -846,22 +828,6 @@ public class AggregationClient implements Closeable { return null; } - AggregateRequest - validateArgAndGetPB(Scan scan, ColumnInterpreter ci, boolean canFamilyBeAbsent) - throws IOException { - validateParameters(scan, canFamilyBeAbsent); - final AggregateRequest.Builder requestBuilder = - AggregateRequest.newBuilder(); - requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); - P columnInterpreterSpecificData = null; - if ((columnInterpreterSpecificData = ci.getRequestData()) - != null) { - requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); - } - requestBuilder.setScan(ProtobufUtil.toScan(scan)); - return requestBuilder.build(); - } - byte[] getBytesFromResponse(ByteString response) { ByteBuffer bb = response.asReadOnlyByteBuffer(); bb.rewind(); @@ -873,40 +839,4 @@ public class AggregationClient implements Closeable { } return bytes; } - - /** - * Get an instance of the argument type declared in a class's signature. The - * argument type is assumed to be a PB Message subclass, and the instance is - * created using parseFrom method on the passed ByteString. - * @param runtimeClass the runtime type of the class - * @param position the position of the argument in the class declaration - * @param b the ByteString which should be parsed to get the instance created - * @return the instance - * @throws IOException - */ - @SuppressWarnings("unchecked") - // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. - public static - T getParsedGenericInstance(Class runtimeClass, int position, ByteString b) - throws IOException { - Type type = runtimeClass.getGenericSuperclass(); - Type argType = ((ParameterizedType)type).getActualTypeArguments()[position]; - Class classType = (Class)argType; - T inst; - try { - Method m = classType.getMethod("parseFrom", ByteString.class); - inst = (T)m.invoke(null, b); - return inst; - } catch (SecurityException e) { - throw new IOException(e); - } catch (NoSuchMethodException e) { - throw new IOException(e); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } catch (InvocationTargetException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } - } } \ No newline at end of file diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java new file mode 100644 index 0000000..b91128c --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.coprocessor; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Helper class for constructing aggregation request and response. + */ +@InterfaceAudience.Private +public class AggregationHelper { + + /** + * @param scan + * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan + */ + private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { + if (scan == null + || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) + && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) + || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) + && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) { + throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); + } else if (!canFamilyBeAbsent) { + if (scan.getFamilyMap().size() != 1) { + throw new IOException("There must be only one family."); + } + } + } + + static AggregateRequest + validateArgAndGetPB(Scan scan, ColumnInterpreter ci, boolean canFamilyBeAbsent) + throws IOException { + validateParameters(scan, canFamilyBeAbsent); + final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder(); + requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); + P columnInterpreterSpecificData = null; + if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { + requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); + } + requestBuilder.setScan(ProtobufUtil.toScan(scan)); + return requestBuilder.build(); + } + + /** + * Get an instance of the argument type declared in a class's signature. The argument type is + * assumed to be a PB Message subclass, and the instance is created using parseFrom method on the + * passed ByteString. + * @param runtimeClass the runtime type of the class + * @param position the position of the argument in the class declaration + * @param b the ByteString which should be parsed to get the instance created + * @return the instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. + public static T getParsedGenericInstance(Class runtimeClass, int position, + ByteString b) throws IOException { + Type type = runtimeClass.getGenericSuperclass(); + Type argType = ((ParameterizedType) type).getActualTypeArguments()[position]; + Class classType = (Class) argType; + T inst; + try { + Method m = classType.getMethod("parseFrom", ByteString.class); + inst = (T) m.invoke(null, b); + return inst; + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (InvocationTargetException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java new file mode 100644 index 0000000..db1aa81 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.coprocessor; + +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; + +import com.google.protobuf.Message; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.RawAsyncTable; +import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; + +/** + * This client class is for invoking the aggregate functions deployed on the Region Server side via + * the AggregateService. This class will implement the supporting functionality for + * summing/processing the individual results obtained from the AggregateService for each region. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class AsyncAggregationClient { + + public static CompletableFuture + max(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + CoprocessorCallback callback = new CoprocessorCallback() { + + private boolean finished = false; + + private R max; + + @Override + public void onRegionComplete(HRegionInfo region, AggregateResponse resp) { + if (resp.getFirstPartCount() > 0) { + Q q; + try { + q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(0)); + } catch (IOException e) { + synchronized (this) { + completeExceptionally(e); + } + return; + } + R result = ci.getCellValueFromProto(q); + synchronized (this) { + if (finished) { + return; + } + if (max == null || (result != null && ci.compare(max, result) < 0)) { + max = result; + } + } + } + } + + private void completeExceptionally(Throwable error) { + if (finished) { + return; + } + finished = true; + future.completeExceptionally(error); + } + + @Override + public synchronized void onRegionError(HRegionInfo region, Throwable error) { + completeExceptionally(error); + } + + @Override + public synchronized void onComplete() { + finished = true; + future.complete(max); + } + + @Override + public synchronized void onError(Throwable error) { + completeExceptionally(error); + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, done) -> stub.getMax(controller, req, done), scan.getStartRow(), + scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), callback); + return future; + } +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java index 08b0562..bccb76a 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -18,6 +18,14 @@ */ package org.apache.hadoop.hbase.coprocessor; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -40,12 +47,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * A concrete AggregateProtocol implementation. Its system level coprocessor * that computes the aggregate function at a region level. @@ -485,7 +486,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { ColumnInterpreter ci = (ColumnInterpreter) cls.newInstance(); if (request.hasInterpreterSpecificBytes()) { ByteString b = request.getInterpreterSpecificBytes(); - P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b); + P initMsg = getParsedGenericInstance(ci.getClass(), 2, b); ci.initialize(initMsg); } return ci; diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java new file mode 100644 index 0000000..95e35e3 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient.max; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; +import org.apache.hadoop.hbase.coprocessor.AggregateImplementation; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, CoprocessorTests.class }) +public class TestAsyncAggregationClient { + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("TestAsyncAggregationClient"); + + private static byte[] CF = Bytes.toBytes("CF"); + + private static byte[] CQ = Bytes.toBytes("CQ"); + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + private static RawAsyncTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + AggregateImplementation.class.getName()); + UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + UTIL.createTable(TABLE_NAME, CF, splitKeys); + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()); + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE.putAll(LongStream.range(0, COUNT) + .mapToObj( + l -> new Put(Bytes.toBytes(String.format("%03d", l))).addColumn(CF, CQ, Bytes.toBytes(l))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws InterruptedException, ExecutionException { + assertEquals(COUNT - 1, + max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); + } +} -- 2.7.4