Index: src/main/java/org/apache/hadoop/hbase/client/HRegionInterfaceProxyImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HRegionInterfaceProxyImpl.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/HRegionInterfaceProxyImpl.java (working copy) @@ -0,0 +1,483 @@ +package org.apache.hadoop.hbase.client; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.ipc.HBaseClient; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.AssignmentPlan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC.VersionIncompatible; +import org.apache.hadoop.security.UserGroupInformation; + +public class HRegionInterfaceProxyImpl implements HRegionInterface, Writable { + static final Log LOG = LogFactory.getLog(HRegionInterfaceProxyImpl.class); + private InetSocketAddress address; + private UserGroupInformation ticket; + private HBaseClient client; + final private int rpcTimeout; + public HBaseRPCOptions options; + ByteArrayOutputStream baos; + DataOutput out; + + public HRegionInterfaceProxyImpl(InetSocketAddress address, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, + int rpcTimeout, HBaseRPCOptions options) { + this.address = address; + this.ticket = ticket; + this.client = HBaseRPC.CLIENTS.getClient(conf, factory); + this.rpcTimeout = rpcTimeout; + this.options = options; + baos = new ByteArrayOutputStream(); + out = new DataOutputStream(baos); + } + + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getProtocolVersion(String arg0, long arg1) + throws VersionIncompatible, IOException { + LOG.error("getProtocolVersion called"); + // serialize the method call params + // 1. write the method name + out.writeUTF("getProtocolVersion"); + // 2. number of args = 2 + out.writeInt(2); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(String.class)); + Text.writeString(out, arg0); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Long.class)); + out.writeLong(arg1); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); + // return the expected result + LOG.error("getProtocolVersion returning " + (Long)(value.get())); + return (Long)(value.get()); + } + + @Override + public void stopForRestart() { + // TODO Auto-generated method stub + + } + + @Override + public HRegionInfo getRegionInfo(byte[] regionName) + throws NotServingRegionException { +// System.err.println("KAR: getRegionInfo() called for " + new String(regionName)); + // serialize the method call params + // 1. write the method name + try { + out.writeUTF("getRegionInfo"); + // 2. number of args = 1 + out.writeInt(1); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, regionName); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); + // return the expected result +// System.err.println("KAR: getRegionInfo() returning " + ((HRegionInfo)(value.get())).getEncodedName() + " region info"); + return (HRegionInfo)(value.get()); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.error("Could not serialize params for getRegionInfo", e); + throw new RuntimeException(e); + } + } + + @Override + public Result getClosestRowBefore(byte[] regionName, byte[] row, byte[] family) + throws IOException { + try { +// System.err.println("KAR: getClosestRowBefore() called for " + new String(regionName)); + out.writeUTF("getClosestRowBefore"); + // 2. number of args = 3 + out.writeInt(3); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, regionName); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, row); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, family); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); +// System.err.println("KAR: getClosestRowBefore() returned " + ((Result)(value.get())).size() + " kvs"); + // return the expected result + return (Result)(value.get()); + } catch (IOException e) { + LOG.error("Error in calling getClosestRowBefore()", e); + throw e; + } + } + + @Override + public HRegion[] getOnlineRegionsAsArray() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void flushRegion(byte[] regionName) throws IllegalArgumentException, + IOException { + // TODO Auto-generated method stub + + } + + @Override + public void flushRegion(byte[] regionName, long ifOlderThanTS) + throws IllegalArgumentException, IOException { + // TODO Auto-generated method stub + + } + + @Override + public long getLastFlushTime(byte[] regionName) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public MapWritable getLastFlushTimes() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getCurrentTimeMillis() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getStartCode() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public List getStoreFileList(byte[] regionName, byte[] columnFamily) + throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getStoreFileList(byte[] regionName, + byte[][] columnFamilies) throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getStoreFileList(byte[] regionName) + throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getHLogsList(boolean rollCurrentHLog) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result get(byte[] regionName, Get get) throws IOException { + try { + out.writeUTF("get"); + // 2. number of args + out.writeInt(2); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, regionName); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Get.class)); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Get.class)); + get.write(out); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); + // return the expected result + return (Result)(value.get()); + } catch (IOException e) { + LOG.error("Error in doing get from region " + new String(regionName), e); + throw e; + } + } + + @Override + public Result[] get(byte[] regionName, List gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean exists(byte[] regionName, Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void put(byte[] regionName, Put put) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int put(byte[] regionName, List puts) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void delete(byte[] regionName, Delete delete) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int delete(byte[] regionName, List deletes) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public long incrementColumnValue(byte[] regionName, byte[] row, + byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + try { +// System.err.println("KAR: openScanner() called for " + new String(regionName)); + out.writeUTF("openScanner"); + // 2. number of args + out.writeInt(2); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(byte [].class)); + Bytes.writeByteArray(out, regionName); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Scan.class)); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Scan.class)); + scan.write(out); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); +// System.err.println("KAR: openScanner() returned " + (Long)(value.get())); + // return the expected result + return (Long)(value.get()); + } catch (IOException e) { + LOG.error("Error in calling openScanner", e); + throw e; + } + } + + @Override + public void mutateRow(byte[] regionName, RowMutations arm) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void mutateRow(byte[] regionName, List armList) + throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public Result next(long scannerId) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result[] next(long scannerId, int numberOfRows) throws IOException { + try { +// System.err.println("KAR: next() called for scannerId " + scannerId + ", rows = " + numberOfRows); + out.writeUTF("next"); + // 2. number of args + out.writeInt(2); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Long.TYPE)); + out.writeLong(scannerId); + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Integer.TYPE)); + out.writeInt(numberOfRows); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); +// System.err.println("KAR: next() returned " + ((Result[])(value.get())).length + " rows"); + // return the expected result + return (Result[])(value.get()); + } catch (IOException e) { + LOG.error("Error calling next on scannerId" + scannerId + + " to fetch " + numberOfRows + " rows", e); + throw e; + } + } + + @Override + public void close(long scannerId) throws IOException { + try { +// System.err.println("KAR: close() called for scannerId " + scannerId); + out.writeUTF("close"); + // 2. number of args + out.writeInt(1); + // 3. write class code and value of args + out.writeByte(HbaseObjectWritable.CLASS_TO_CODE.get(Long.TYPE)); + out.writeLong(scannerId); + // make the remote call + HbaseObjectWritable value = client.call(this, + address, ticket, rpcTimeout, options); +// System.err.println("KAR: close() returned for scannerId " + scannerId); + } catch (IOException e) { + LOG.error("Error closing scannerId " + scannerId, e); + throw e; + } + } + + @Override + public long lockRow(byte[] regionName, byte[] row) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void unlockRow(byte[] regionName, long lockId) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public HRegionInfo[] getRegionsAssignment() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HServerInfo getHServerInfo() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public MultiResponse multiAction(MultiAction multi) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void bulkLoadHFile(String hfilePath, byte[] regionName, + byte[] familyName) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void bulkLoadHFile(String hfilePath, byte[] regionName, + byte[] familyName, boolean assignSeqNum) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void closeRegion(HRegionInfo hri, boolean reportWhenCompleted) + throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int updateFavoredNodes(AssignmentPlan plan) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void updateConfiguration() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean isStopped() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void stop(String why) { + // TODO Auto-generated method stub + + } + + @Override + public String getStopReason() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("Illegal method call"); + } + + @Override + public void write(DataOutput out) throws IOException { + baos.flush(); + out.write(baos.toByteArray()); + baos.reset(); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 31243) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -52,6 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import javax.net.SocketFactory; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -1285,18 +1287,22 @@ return server; } - try { +// try { // establish an RPC for this RS // set hbase.ipc.client.connect.max.retries to retry connection // attempts - server = (HRegionInterface) HBaseRPC.getProxy( - serverInterfaceClass, HBaseRPCProtocolVersion.versionID, - regionServer.getInetSocketAddress(), this.conf, - this.rpcTimeout, options); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } +// server = (HRegionInterface) HBaseRPC.getProxy( +// serverInterfaceClass, HBaseRPCProtocolVersion.versionID, +// regionServer.getInetSocketAddress(), this.conf, +// this.rpcTimeout, options); +// } catch (RemoteException e) { +// throw RemoteExceptionHandler.decodeRemoteException(e); +// } + server = new HRegionInterfaceProxyImpl( + regionServer.getInetSocketAddress(), null, conf, + SocketFactory.getDefault(), rpcTimeout, options); + return server; } Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 31243) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -106,13 +106,14 @@ // Here we maintain two static maps of classes to code and vice versa. // Add new classes+codes as wanted or figure way to auto-generate these // maps from the HMasterInterface. - static final Map> CODE_TO_CLASS = + public static final Map> CODE_TO_CLASS = new HashMap>(); - static final Map, Byte> CLASS_TO_CODE = + public static final Map, Byte> CLASS_TO_CODE = new HashMap, Byte>(); // Special code that means 'not-encoded'; in this case we do old school // sending of the class name using reflection, etc. private static final byte NOT_ENCODED = 0; + public static boolean verbose = false; static { // Add new objects to the end of the list to preserve @@ -330,6 +331,7 @@ throw new UnsupportedOperationException("No code for unexpected " + c); } out.writeByte(code); + LOG.trace("[HBase Client -> RS] Classcode = " + code); } @@ -386,6 +388,7 @@ } else { int length = Array.getLength(instanceObj); out.writeInt(length); + LOG.trace("[HBase Client -> RS] Array, length = " + length); for (int i = 0; i < length; i++) { writeObject(out, Array.get(instanceObj, i), declClass.getComponentType(), conf); @@ -395,6 +398,7 @@ List list = (List)instanceObj; int length = list.size(); out.writeInt(length); + LOG.trace("[HBase Client -> RS] List, length = " + length); for (int i = 0; i < length; i++) { writeObject(out, list.get(i), list.get(i).getClass(), conf); @@ -405,41 +409,55 @@ writeObject(out, pair.getSecond(), pair.getSecond().getClass(), conf); } else if (declClass == String.class) { // String Text.writeString(out, (String)instanceObj); + LOG.trace("[HBase Client -> RS] String, data = " + (String)instanceObj); } else if (declClass.isPrimitive()) { // primitive type if (declClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean)instanceObj).booleanValue()); + LOG.trace("[HBase Client -> RS] Boolean, data = " + (((Boolean)instanceObj).booleanValue()?"t":"f")); } else if (declClass == Character.TYPE) { // char out.writeChar(((Character)instanceObj).charValue()); + LOG.trace("[HBase Client -> RS] String, data = " + ((Character)instanceObj).charValue()); } else if (declClass == Byte.TYPE) { // byte out.writeByte(((Byte)instanceObj).byteValue()); + LOG.trace("[HBase Client -> RS] Byte, data = " + ((Byte)instanceObj).byteValue()); } else if (declClass == Short.TYPE) { // short out.writeShort(((Short)instanceObj).shortValue()); + LOG.trace("[HBase Client -> RS] Short, data = " + ((Short)instanceObj).shortValue()); } else if (declClass == Integer.TYPE) { // int out.writeInt(((Integer)instanceObj).intValue()); + LOG.trace("[HBase Client -> RS] Integer, data = " + ((Integer)instanceObj).intValue()); } else if (declClass == Long.TYPE) { // long out.writeLong(((Long)instanceObj).longValue()); + LOG.trace("[HBase Client -> RS] Long, data = " + ((Long)instanceObj).longValue()); } else if (declClass == Float.TYPE) { // float out.writeFloat(((Float)instanceObj).floatValue()); + LOG.trace("[HBase Client -> RS] Float, data = " + ((Float)instanceObj).floatValue()); } else if (declClass == Double.TYPE) { // double out.writeDouble(((Double)instanceObj).doubleValue()); + LOG.trace("[HBase Client -> RS] Double, data = " + ((Double)instanceObj).doubleValue()); } else if (declClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: "+declClass); } } else if (declClass == Integer.class) { // Integer out.writeInt(((Integer) instanceObj).intValue()); + LOG.trace("[HBase Client -> RS] Integer, data = " + ((Integer)instanceObj).intValue()); } else if (declClass.isEnum()) { // enum Text.writeString(out, ((Enum)instanceObj).name()); + LOG.trace("[HBase Client -> RS] Enum, data = " + ((Enum)instanceObj).name()); } else if (Writable.class.isAssignableFrom(declClass)) { // Writable Class c = instanceObj.getClass(); Byte code = CLASS_TO_CODE.get(c); if (code == null) { out.writeByte(NOT_ENCODED); Text.writeString(out, c.getName()); + LOG.trace("[HBase Client -> RS] Byte, (NOT_ENCODED) data = " + NOT_ENCODED); + LOG.trace("[HBase Client -> RS] String, data = " + c.getName()); } else { writeClassCode(out, c); } ((Writable)instanceObj).write(out); + LOG.trace("[HBase Client -> RS] Writable serialized: " + instanceObj.getClass().getName()); } else { throw new IOException("Can't write: "+instanceObj+" as "+declClass); } Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 31243) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -150,12 +150,16 @@ } public void write(DataOutput out) throws IOException { + StringBuilder debugInfo = new StringBuilder(); + LOG.debug("[HBase Client -> RS] " + "methodName: " + methodName + + ", parameterClasses.length: " + parameterClasses.length); out.writeUTF(this.methodName); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf); } + LOG.debug("[HBase Client -> RS] -------------"); } @Override @@ -182,7 +186,7 @@ } /* Cache a client using its socket factory as the hash key */ - static private class ClientCache { + static public class ClientCache { private Map clients = new HashMap(); @@ -196,7 +200,7 @@ * @param factory socket factory * @return an IPC client */ - protected synchronized HBaseClient getClient(Configuration conf, + public synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) { // Construct & cache client. The configuration is only used for timeout, // and Clients have connection pools. So we can either (a) lose some @@ -241,7 +245,7 @@ } } - protected final static ClientCache CLIENTS = new ClientCache(); + public final static ClientCache CLIENTS = new ClientCache(); private static class Invoker implements InvocationHandler { private InetSocketAddress address;