Multiple types of coprocessors are provided to provide sufficient flexibility for potential use cases. Right now there are:
Coprocessor interface so that coprocessor framework
can manage it internally.
Another design goal of this interface is to provide simple features for making coprocessors useful, while exposing no more internal state or control actions of the region server than necessary and not exposing them directly.
Over the lifecycle of a region, the methods of this interface are invoked when the corresponding events happen. The master transitions regions through the following states:
unassigned -> pendingOpen -> open -> pendingClose -> closed.
Coprocessors have opportunity to intercept and handle events in pendingOpen, open, and pendingClose states.
The region server is opening a region to bring it online. Coprocessors can piggyback or fail this process.
RegionObserver interface it can
observe and mediate client actions on the region:
BaseRegionObserver which
implements both Coprocessor and RegionObserver.
In addition, it overrides all methods with default behaviors so you don't
have to override all of them.
Here's an example of what a simple RegionObserver might look like. This
example shows how to implement role-based access control for HBase. This
coprocessor checks user information for a given client request, e.g.,
Get/Put/Delete/Scan by injecting code at certain
RegionObserver
preXXX hooks. If the user is not allowed to access the resource, a
CoprocessorException will be thrown. And the client request will be
denied by receiving this exception.
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
// Sample role-base-access-control coprocessor. It utilizes RegionObserver
// and intercept preXXX() method to check user privilege for the given table
// and column family.
public class RBACCoprocessor extends BaseRegionObserver {
@Override
public List<KeyValue> preGet(CoprocessorEnvironment e, Get get,
List<KeyValue> results)
throws CoprocessorException {
// check permissions..
if (access_not_allowed) {
throw new AccessDeniedException("User is not allowed to access.");
}
return results;
}
// override prePut(), preDelete(), etc.
}
Coprocessor and RegionObserver provide certain hooks
for injecting user code running at each region. These code will be triggerd
with existing HTable and HBaseAdmin operations at
the certain hook points.
Through CommandTarget and dynamic RPC protocol, you can define your own
interface communicated between client and region server,
i.e., you can specify new passed parameters and return types for a
method.
And the new CommandTarget methods can be triggered by
calling client side dynamic RPC functions -- HTable.exec(...).
To implement a CommandTarget, you need to:
CoprocessorProtocol: the interface defines
communication protocol for the new CommandTarget, and will be
served as the RPC protocol between client and region server.BaseCommandTarget abstract class,
and the above extended CoprocessorProtocol interface:
the actually implemention class running at region server.Here's an example of performing column aggregation at region server:
// A sample protocol for performing aggregation at regions.
public static interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
public int sum(byte[] family, byte[] qualifier) throws IOException;
}
// Aggregation implementation at a region.
public static class ColumnAggregationCommandTarget extends BaseCommandTarget
implements ColumnAggregationProtocol {
@Override
public int sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
int sumResult = 0;
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean done = false;
do {
curVals.clear();
done = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toInt(kv.getValue());
} while (done);
} finally {
scanner.close();
}
return sumResult;
}
}
Client invocations are performed through HTable,
which has the following methods added by dynamic RPC protocol:
public <T extends CoprocessorProtocol> T proxy(Class<T> protocol, Row row)
public <T extends CoprocessorProtocol, R> void exec(
Class<T> protocol, List<? extends Row> rows,
BatchCall<T,R> callable, BatchCallback<R> callback)
public <T extends CoprocessorProtocol, R> void exec(
Class<T> protocol, RowRange range,
BatchCall<T,R> callable, BatchCallback<R> callback)
How is the client side example of calling
ColumnAggregationCommandTarget:
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map<byte[], Integer> results;
// scan: for all regions
scan = new Scan();
results = table.exec(ColumnAggregationProtocol.class, scan,
new BatchCall<ColumnAggregationProtocol,Integer>() {
public Integer call(ColumnAggregationProtocol instance) throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Integer> e : results.entrySet()) {
sumResult += e.getValue();
}
HTableDescriptor for a newly created table.
(Currently we don't really have an on demand coprocessor loading machanism for opened regions.)
hbase.coprocessor.default.classes from Configuration.
Coprocessor framework will automatically load the configured classes as
default coprocessors. The classes must be included in the classpath already.
<property>
<name>hbase.coprocessor.default.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.RBACCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol</value>
<description>A comma-separated list of Coprocessors that are loaded by
default. For any override coprocessor method from RegionObservor or
Coprocessor, these classes' implementation will be called
in order. After implement your own
Coprocessor, just put it in HBase's classpath and add the fully
qualified class name here.
</description>
</property>
The first defined coprocessor will be assigned
Coprocessor.Priority.SYSTEM as priority. And each following
coprocessor's priority will be incremented by one. Coprocessors are executed
in order according to the natural ordering of the int.
HTableDescriptor as a table
attribute. The
attribute key starts with "Coprocessor" and values of the form is
<path>:<class>:<priority>, e.g.
'Coprocessor$1' => 'hdfs://localhost:8020/hbase/coprocessors/test.jar:Test:1000' 'Coprocessor$2' => '/hbase/coprocessors/test2.jar:AnotherTest:1001'
<path> must point to a jar, can be on any filesystem supported by the Hadoop FileSystem object.
<class> is the coprocessor implementation class. A jar can contain more than one coprocessor implementation, but only one can be specified at a time in each table attribute.
<priority> is an integer. Coprocessors are executed in order according to the natural ordering of the int. Coprocessors can optionally abort actions. So typically one would want to put authoritative CPs (security policy implementations, perhaps) ahead of observers.
Path path = new Path(fs.getUri() + Path.SEPARATOR +
"TestClassloading.jar");
// create a table that references the jar
HTableDescriptor htd = new HTableDescriptor(getClass().getName());
htd.addFamily(new HColumnDescriptor("test"));
htd.setValue("Coprocessor$1",
path.toString() +
":" + classFullName +
":" + Coprocessor.Priority.USER);
HBaseAdmin admin = new HBaseAdmin(this.conf);
admin.createTable(htd);