i : servers.values()) {
+ for (VersionedProtocol server: i.values()) {
+ HBaseRPC.stopProxy(server);
+ }
}
}
closeZooKeeperWatcher();
diff --git src/main/java/org/apache/hadoop/hbase/client/HTable.java src/main/java/org/apache/hadoop/hbase/client/HTable.java
index aa7652f..2c87d50 100644
--- src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -53,13 +53,27 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
+import com.google.protobuf.ServiceException;
+
/**
* Used to communicate with a single HBase table.
*
@@ -648,8 +662,15 @@ public class HTable implements HTableInterface {
throws IOException {
return new ServerCallable(connection, tableName, row, operationTimeout) {
public Result call() throws IOException {
- return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
- row, family);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), row, family, true);
+ GetResponse response = server.get(null, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -694,7 +715,14 @@ public class HTable implements HTableInterface {
public Result get(final Get get) throws IOException {
return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.get(location.getRegionInfo().getRegionName(), get);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), get);
+ GetResponse response = server.get(null, request);
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -746,13 +774,18 @@ public class HTable implements HTableInterface {
@Override
public void delete(final Delete delete)
throws IOException {
- new ServerCallable(connection, tableName, delete.getRow(),
- operationTimeout) {
- public Void call() throws IOException {
- server.delete(location.getRegionInfo().getRegionName(), delete);
- return null;
- }
- }.withRetries();
+ new ServerCallable(connection, tableName, delete.getRow(), operationTimeout) {
+ public Boolean call() throws IOException {
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), delete);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ }.withRetries();
}
/**
@@ -821,7 +854,13 @@ public class HTable implements HTableInterface {
new ServerCallable(connection, tableName, rm.getRow(),
operationTimeout) {
public Void call() throws IOException {
- server.mutateRow(location.getRegionInfo().getRegionName(), rm);
+ try {
+ MultiRequest request = RequestConverter.buildMultiRequest(
+ location.getRegionInfo().getRegionName(), rm);
+ server.multi(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
return null;
}
}.withRetries();
@@ -838,8 +877,15 @@ public class HTable implements HTableInterface {
}
return new ServerCallable(connection, tableName, append.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.append(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), append);
+ MutateResponse response = server.mutate(null, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -855,8 +901,14 @@ public class HTable implements HTableInterface {
}
return new ServerCallable(connection, tableName, increment.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.increment(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), increment);
+ MutateResponse response = server.mutate(null, request);
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -890,9 +942,16 @@ public class HTable implements HTableInterface {
}
return new ServerCallable(connection, tableName, row, operationTimeout) {
public Long call() throws IOException {
- return server.incrementColumnValue(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family,
qualifier, amount, writeToWAL);
+ MutateResponse response = server.mutate(null, request);
+ Result result = ProtobufUtil.toResult(response.getResult());
+ return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -907,8 +966,15 @@ public class HTable implements HTableInterface {
throws IOException {
return new ServerCallable(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
- return server.checkAndPut(location.getRegionInfo().getRegionName(),
- row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, put);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -924,10 +990,15 @@ public class HTable implements HTableInterface {
throws IOException {
return new ServerCallable(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
- return server.checkAndDelete(
- location.getRegionInfo().getRegionName(),
- row, family, qualifier, value, delete)
- ? Boolean.TRUE : Boolean.FALSE;
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, delete);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -939,8 +1010,14 @@ public class HTable implements HTableInterface {
public boolean exists(final Get get) throws IOException {
return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
public Boolean call() throws IOException {
- return server.
- exists(location.getRegionInfo().getRegionName(), get);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), get, true);
+ GetResponse response = server.get(null, request);
+ return response.getExists();
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -1026,9 +1103,14 @@ public class HTable implements HTableInterface {
throws IOException {
return new ServerCallable(connection, tableName, row, operationTimeout) {
public RowLock call() throws IOException {
- long lockId =
- server.lockRow(location.getRegionInfo().getRegionName(), row);
- return new RowLock(row,lockId);
+ try {
+ LockRowRequest request = RequestConverter.buildLockRowRequest(
+ location.getRegionInfo().getRegionName(), row);
+ LockRowResponse response = server.lockRow(null, request);
+ return new RowLock(row, response.getLockId());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -1039,14 +1121,18 @@ public class HTable implements HTableInterface {
@Override
public void unlockRow(final RowLock rl)
throws IOException {
- new ServerCallable(connection, tableName, rl.getRow(),
- operationTimeout) {
- public Void call() throws IOException {
- server.unlockRow(location.getRegionInfo().getRegionName(), rl
- .getLockId());
- return null;
- }
- }.withRetries();
+ new ServerCallable(connection, tableName, rl.getRow(), operationTimeout) {
+ public Boolean call() throws IOException {
+ try {
+ UnlockRowRequest request = RequestConverter.buildUnlockRowRequest(
+ location.getRegionInfo().getRegionName(), rl.getLockId());
+ server.unlockRow(null, request);
+ return Boolean.TRUE;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ }.withRetries();
}
/**
diff --git src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 9903df3..fe80fcf 100644
--- src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -25,15 +25,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
+import com.google.protobuf.ServiceException;
+
/**
* Retries scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link HTable}.
@@ -107,41 +114,58 @@ public class ScannerCallable extends ServerCallable {
* @see java.util.concurrent.Callable#call()
*/
public Result [] call() throws IOException {
- if (scannerId != -1L && closed) {
- close();
- } else if (scannerId == -1L && !closed) {
- this.scannerId = openScanner();
+ if (closed) {
+ if (scannerId != -1) {
+ close();
+ }
} else {
- Result [] rrs = null;
- try {
- incRPCcallsMetrics();
- rrs = server.next(scannerId, caching);
- updateResultsMetrics(rrs);
- } catch (IOException e) {
- IOException ioe = null;
- if (e instanceof RemoteException) {
- ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
- }
- if (ioe == null) throw new IOException(e);
- if (ioe instanceof NotServingRegionException) {
- // Throw a DNRE so that we break out of cycle of calling NSRE
- // when what we need is to open scanner against new location.
- // Attach NSRE to signal client that it needs to resetup scanner.
- if (this.scanMetrics != null) {
- this.scanMetrics.countOfNSRE.inc();
+ if (scannerId == -1L) {
+ this.scannerId = openScanner();
+ } else {
+ Result [] rrs = null;
+ try {
+ incRPCcallsMetrics();
+ ScanRequest request =
+ RequestConverter.buildScanRequest(scannerId, caching, false);
+ try {
+ ScanResponse response = server.scan(null, request);
+ rrs = ResponseConverter.getResults(response);
+ if (response.hasMoreResults()
+ && !response.getMoreResults()) {
+ scannerId = -1L;
+ closed = true;
+ return null;
+ }
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ updateResultsMetrics(rrs);
+ } catch (IOException e) {
+ IOException ioe = null;
+ if (e instanceof RemoteException) {
+ ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
+ }
+ if (ioe == null) throw new IOException(e);
+ if (ioe instanceof NotServingRegionException) {
+ // Throw a DNRE so that we break out of cycle of calling NSRE
+ // when what we need is to open scanner against new location.
+ // Attach NSRE to signal client that it needs to resetup scanner.
+ if (this.scanMetrics != null) {
+ this.scanMetrics.countOfNSRE.inc();
+ }
+ throw new DoNotRetryIOException("Reset scanner", ioe);
+ } else if (ioe instanceof RegionServerStoppedException) {
+ // Throw a DNRE so that we break out of cycle of calling RSSE
+ // when what we need is to open scanner against new location.
+ // Attach RSSE to signal client that it needs to resetup scanner.
+ throw new DoNotRetryIOException("Reset scanner", ioe);
+ } else {
+ // The outer layers will retry
+ throw ioe;
}
- throw new DoNotRetryIOException("Reset scanner", ioe);
- } else if (ioe instanceof RegionServerStoppedException) {
- // Throw a DNRE so that we break out of cycle of calling RSSE
- // when what we need is to open scanner against new location.
- // Attach RSSE to signal client that it needs to resetup scanner.
- throw new DoNotRetryIOException("Reset scanner", ioe);
- } else {
- // The outer layers will retry
- throw ioe;
}
+ return rrs;
}
- return rrs;
}
return null;
}
@@ -161,10 +185,12 @@ public class ScannerCallable extends ServerCallable {
return;
}
for (Result rr : rrs) {
- this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
- if (isRegionServerRemote) {
- this.scanMetrics.countOfBytesInRemoteResults.inc(
- rr.getBytes().getLength());
+ if (rr.getBytes() != null) {
+ this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
+ if (isRegionServerRemote) {
+ this.scanMetrics.countOfBytesInRemoteResults.inc(
+ rr.getBytes().getLength());
+ }
}
}
}
@@ -175,7 +201,13 @@ public class ScannerCallable extends ServerCallable {
}
try {
incRPCcallsMetrics();
- this.server.close(this.scannerId);
+ ScanRequest request =
+ RequestConverter.buildScanRequest(this.scannerId, 0, true);
+ try {
+ server.scan(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
} catch (IOException e) {
LOG.warn("Ignore, probably already closed", e);
}
@@ -184,8 +216,16 @@ public class ScannerCallable extends ServerCallable {
protected long openScanner() throws IOException {
incRPCcallsMetrics();
- return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
- this.scan);
+ ScanRequest request =
+ RequestConverter.buildScanRequest(
+ this.location.getRegionInfo().getRegionName(),
+ this.scan, 0, false);
+ try {
+ ScanResponse response = server.scan(null, request);
+ return response.getScannerId();
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
protected Scan getScan() {
diff --git src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
index ddcf9ad..2a9d86e 100644
--- src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
+++ src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
@@ -57,7 +57,7 @@ public abstract class ServerCallable implements Callable {
protected final byte [] tableName;
protected final byte [] row;
protected HRegionLocation location;
- protected HRegionInterface server;
+ protected ClientProtocol server;
protected int callTimeout;
protected long startTime, endTime;
@@ -84,8 +84,8 @@ public abstract class ServerCallable implements Callable {
*/
public void connect(final boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload);
- this.server = connection.getHRegionConnection(location.getHostname(),
- location.getPort());
+ this.server = connection.getClient(location.getHostname(),
+ location.getPort());
}
/** @return the server name
@@ -224,7 +224,7 @@ public abstract class ServerCallable implements Callable {
}
}
- private static Throwable translateException(Throwable t) throws IOException {
+ protected static Throwable translateException(Throwable t) throws IOException {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
diff --git src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
index 1acbdab..a179bf3 100644
--- src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
+++ src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
@@ -19,13 +19,10 @@
*/
package org.apache.hadoop.hbase.filter;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import org.apache.hadoop.hbase.filter.*;
/**
* ParseConstants holds a bunch of constants related to parsing Filter Strings
diff --git src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
index cbfa489..35b2c8b 100644
--- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
+++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
/**
* This is a customized version of the polymorphic hadoop
@@ -268,6 +269,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
GENERIC_ARRAY_CODE = code++;
addToMap(Array.class, GENERIC_ARRAY_CODE);
+ addToMap(RpcController.class, code++);
+
// make sure that this is the last statement in this static block
NEXT_CLASS_CODE = code;
}
@@ -357,7 +360,7 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
}
}
- static Integer getClassCode(final Class> c)
+ public static Integer getClassCode(final Class> c)
throws IOException {
Integer code = CLASS_TO_CODE.get(c);
if (code == null ) {
@@ -726,7 +729,7 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
* @return the instantiated Message instance
* @throws IOException if an IO problem occurs
*/
- private static Message tryInstantiateProtobuf(
+ public static Message tryInstantiateProtobuf(
Class> protoClass,
DataInput dataIn) throws IOException {
diff --git src/main/java/org/apache/hadoop/hbase/io/TimeRange.java src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
index d135393..5189c8c 100644
--- src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
+++ src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
@@ -110,6 +110,14 @@ public class TimeRange implements Writable {
}
/**
+ * Check if it is for all time
+ * @return true if it is for all time
+ */
+ public boolean isAllTime() {
+ return allTime;
+ }
+
+ /**
* Check if the specified timestamp is within this TimeRange.
*
* Returns true if within interval [minStamp, maxStamp), false
diff --git src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
index 05ae717..d71e97e 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
@@ -19,18 +19,23 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.util.Bytes;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-
/**
* Backs a {@link CoprocessorProtocol} subclass proxy and forwards method
* invocations for server execution. Note that internally this will issue a
@@ -74,8 +79,13 @@ public class ExecRPCInvoker implements InvocationHandler {
ServerCallable callable =
new ServerCallable(connection, table, row) {
public ExecResult call() throws Exception {
- return server.execCoprocessor(location.getRegionInfo().getRegionName(),
- exec);
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ ExecCoprocessorRequest request =
+ RequestConverter.buildExecCoprocessorRequest(regionName, exec);
+ ExecCoprocessorResponse response =
+ server.execCoprocessor(null, request);
+ Object value = ProtobufUtil.toObject(response.getValue());
+ return new ExecResult(regionName, value);
}
};
ExecResult result = callable.withRetries();
diff --git src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
index f1f06b0..b7afa58 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
@@ -19,19 +19,23 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
/** A method invocation, including the method name and its parameters.*/
@InterfaceAudience.Private
public class Invocation extends VersionedWritable implements Configurable {
@@ -43,6 +47,17 @@ public class Invocation extends VersionedWritable implements Configurable {
private long clientVersion;
private int clientMethodsHash;
+
+ // For generated protocol classes which don't have VERSION field,
+ // such as protobuf interfaces.
+ private static final Map, Long>
+ PROTOCOL_VERSION = new HashMap, Long>();
+
+ static {
+ PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
+ Long.valueOf(ClientProtocol.VERSION));
+ }
+
private static byte RPC_VERSION = 1;
public Invocation() {}
@@ -51,22 +66,28 @@ public class Invocation extends VersionedWritable implements Configurable {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
- if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ Class> declaringClass = method.getDeclaringClass();
+ if (declaringClass.equals(VersionedProtocol.class)) {
//VersionedProtocol is exempted from version check.
clientVersion = 0;
clientMethodsHash = 0;
} else {
try {
- Field versionField = method.getDeclaringClass().getField("VERSION");
- versionField.setAccessible(true);
- this.clientVersion = versionField.getLong(method.getDeclaringClass());
+ Long version = PROTOCOL_VERSION.get(declaringClass);
+ if (version != null) {
+ this.clientVersion = version.longValue();
+ } else {
+ Field versionField = declaringClass.getField("VERSION");
+ versionField.setAccessible(true);
+ this.clientVersion = versionField.getLong(declaringClass);
+ }
} catch (NoSuchFieldException ex) {
- throw new RuntimeException("The " + method.getDeclaringClass(), ex);
+ throw new RuntimeException("The " + declaringClass, ex);
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
}
- this.clientMethodsHash = ProtocolSignature.getFingerprint(method
- .getDeclaringClass().getMethods());
+ this.clientMethodsHash = ProtocolSignature.getFingerprint(
+ declaringClass.getMethods());
}
}
diff --git src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
index 0573c68..9f159f2 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.*;
import org.codehaus.jackson.map.ObjectMapper;
+import com.google.protobuf.ServiceException;
+
/** An RpcEngine implementation for Writable data. */
@InterfaceAudience.Private
class WritableRpcEngine implements RpcEngine {
@@ -407,6 +410,9 @@ class WritableRpcEngine implements RpcEngine {
if (target instanceof IOException) {
throw (IOException)target;
}
+ if (target instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)target);
+ }
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index b71ae66..d0570b9 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -68,10 +68,13 @@ import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -486,7 +489,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
- return server.bulkLoadHFiles(famPaths, regionName);
+ BulkLoadHFileRequest request =
+ RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
+ BulkLoadHFileResponse response =
+ server.bulkLoadHFile(null, request);
+ return response.getLoaded();
}
};
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
new file mode 100644
index 0000000..422e865
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Private
+public interface AdminProtocol extends
+ AdminService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
new file mode 100644
index 0000000..3d6a23a
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ClientProtocol extends
+ ClientService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 2eb57de..b056830 100644
--- src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,12 +17,89 @@
*/
package org.apache.hadoop.hbase.protobuf;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
/**
* Protobufs utility.
*/
-public class ProtobufUtil {
+public final class ProtobufUtil {
+
+ private ProtobufUtil() {
+ }
+
+ /**
+ * Primitive type to class mapping.
+ */
+ private final static Map>
+ PRIMITIVES = new HashMap>();
+
+ static {
+ PRIMITIVES.put(Boolean.TYPE.getName(), Boolean.TYPE);
+ PRIMITIVES.put(Byte.TYPE.getName(), Byte.TYPE);
+ PRIMITIVES.put(Character.TYPE.getName(), Character.TYPE);
+ PRIMITIVES.put(Short.TYPE.getName(), Short.TYPE);
+ PRIMITIVES.put(Integer.TYPE.getName(), Integer.TYPE);
+ PRIMITIVES.put(Long.TYPE.getName(), Long.TYPE);
+ PRIMITIVES.put(Float.TYPE.getName(), Float.TYPE);
+ PRIMITIVES.put(Double.TYPE.getName(), Double.TYPE);
+ PRIMITIVES.put(Void.TYPE.getName(), Void.TYPE);
+ }
+
/**
* Magic we put ahead of a serialized protobuf message.
* For example, all znode content is protobuf messages with the below magic
@@ -56,4 +133,592 @@ public class ProtobufUtil {
public static int lengthOfPBMagic() {
return PB_MAGIC.length;
}
+
+ /**
+ * Return the IOException thrown by the remote server wrapped in
+ * ServiceException as cause.
+ *
+ * @param se ServiceException that wraps IO exception thrown by the server
+ * @return Exception wrapped in ServiceException or
+ * a new IOException that wraps the unexpected ServiceException.
+ */
+ public static IOException getRemoteException(ServiceException se) {
+ Throwable e = se.getCause();
+ if (e == null) {
+ return new IOException(se);
+ }
+ return e instanceof IOException ? (IOException) e : new IOException(se);
+ }
+
+ /**
+ * Convert a protocol buffer Exec to a client Exec
+ *
+ * @param proto the protocol buffer Exec to convert
+ * @return the converted client Exec
+ */
+ @SuppressWarnings("unchecked")
+ public static Exec toExec(
+ final ClientProtos.Exec proto) throws IOException {
+ byte[] row = proto.getRow().toByteArray();
+ String protocolName = proto.getProtocolName();
+ String methodName = proto.getMethodName();
+ List