diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 081e171..bc66868 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -185,6 +185,27 @@ class IPCUtil { } return codec.getDecoder(is); } + + + /** + * Encapsulate the ugly casting and RuntimeException conversion in private + * method. + * + * @param key - the configuration key for codec for server/client + * hbase.server.rpc.codec - for server + * hbase.client.rpc.codec - for client + * @param clazzName - the default class name + * @param conf + * @return Codec on the RPC client/server interaction. + */ + public static Codec getCodec(final String key, Class clazzName, final Configuration conf) { + String className = conf.get(key, clazzName.getCanonicalName()); + try { + return (Codec) Class.forName(className).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed getting codec " + className, e); + } + } /** * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 83a916e..0998bc0 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -122,7 +122,9 @@ public class RpcClient { protected final boolean tcpKeepAlive; // if T then use keepalives protected int pingInterval; // how often sends ping to the server in msecs protected FailedServers failedServers; - private final Codec codec; + private final Codec localCodec; + //codec used by the server. Currently would be used from configuration + private final Codec remoteCodec; private final CompressionCodec compressor; private final IPCUtil ipcUtil; @@ -360,8 +362,8 @@ public class RpcClient { private Token token; private HBaseSaslRpcClient saslRpcClient; private int reloginMaxBackoff; // max pause before relogin on sasl failure - private final Codec codec; private final CompressionCodec compressor; + private Codec codec; // currently active calls protected final ConcurrentSkipListMap calls = @@ -378,8 +380,8 @@ public class RpcClient { throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); } this.server = remoteId.getAddress(); - this.codec = codec; this.compressor = compressor; + this.codec = codec; UserGroupInformation ticket = remoteId.getTicket().getUGI(); SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); @@ -1088,7 +1090,8 @@ public class RpcClient { int size = responseHeader.getCellBlockMeta().getLength(); byte [] cellBlock = new byte[size]; IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); - cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); + // Instantiate from the configuration but this time the codec configured on the server + cellBlockScanner = ipcUtil.createCellScanner(remoteCodec, this.compressor, cellBlock); } // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. @@ -1239,7 +1242,9 @@ public class RpcClient { this.pingInterval = getPingInterval(conf); this.ipcUtil = new IPCUtil(conf); this.conf = conf; - this.codec = getCodec(); + this.localCodec = getCodec(); + this.remoteCodec = IPCUtil.getCodec(HConstants.HBASE_SERVER_RPC_CODEC, KeyValueCodec.class, + conf); this.compressor = getCompressor(conf); this.socketFactory = factory; this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; @@ -1249,7 +1254,7 @@ public class RpcClient { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.localAddr = localAddr; if (LOG.isDebugEnabled()) { - LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + + LOG.debug("Codec=" + this.localCodec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", maxIdleTime=" + this.maxIdleTime + @@ -1403,7 +1408,7 @@ public class RpcClient { throws InterruptedException, IOException { Call call = new Call(md, param, cells, returnType); Connection connection = - getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); + getConnection(ticket, call, addr, rpcTimeout, this.localCodec, this.compressor); connection.writeRequest(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter @@ -1500,7 +1505,7 @@ public class RpcClient { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { - connection = createConnection(remoteId, this.codec, this.compressor); + connection = createConnection(remoteId, this.localCodec, this.compressor); connections.put(remoteId, connection); } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ee09116..2a45ccc 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -896,6 +896,12 @@ public final class HConstants { */ public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port"; public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100; + + /** + * RPC codec server/client constants + */ + public static final String HBASE_CLIENT_RPC_CODEC = "hbase.client.rpc.codec"; + public static final String HBASE_SERVER_RPC_CODEC = "hbase.server.rpc.codec"; private HConstants() { // Can't be instantiated with this ctor. diff --git hbase-common/src/main/resources/hbase-default.xml hbase-common/src/main/resources/hbase-default.xml index b3242ae..940d39d 100644 --- hbase-common/src/main/resources/hbase-default.xml +++ hbase-common/src/main/resources/hbase-default.xml @@ -983,7 +983,15 @@ possible configurations would overwhelm and obscure the important. are NULL, CRC32, CRC32C. - + + hbase.server.rpc.codec + org.apache.hadoop.hbase.codec.KeyValueCodec + The codec that the RPC server would use in communicating + with the client. + The codec can be updated if the server has to secure the tags from + reaching the client + + hbase.status.published false diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/codec/CustomizedKVCodec.java hbase-common/src/test/java/org/apache/hadoop/hbase/codec/CustomizedKVCodec.java new file mode 100644 index 0000000..5920d2f --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/codec/CustomizedKVCodec.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.codec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.Cell; + +/** + * A dummy implementation of KeyValueCodec + * + */ +public class CustomizedKVCodec extends KeyValueCodec{ + public static class CustomizedKeyValueEncoder extends KeyValueEncoder { + + public CustomizedKeyValueEncoder(OutputStream out) { + super(out); + } + + @Override + public void write(Cell cell) throws IOException { + this.out.write(1); + super.write(cell); + } + + } + public static class CustomizedKeyValueDecoder extends KeyValueDecoder { + public CustomizedKeyValueDecoder(InputStream in) { + super(in); + } + + @Override + protected Cell parseCell() throws IOException { + in.read(); + return super.parseCell(); + } + } + @Override + public Encoder getEncoder(OutputStream os) { + return new CustomizedKeyValueEncoder(os); + } + + @Override + public Decoder getDecoder(InputStream is) { + return new CustomizedKeyValueDecoder(is); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index bd7c545..a62eb3c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -369,7 +370,7 @@ public class RpcServer implements RpcServerInterface { headerBuilder.setException(exceptionBuilder.build()); } ByteBuffer cellBlock = - ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); + ipcUtil.buildCellBlock(this.connection.localCodec, this.connection.compressionCodec, cells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. @@ -462,7 +463,7 @@ public class RpcServer implements RpcServerInterface { @Override public boolean isClientCellBlockSupport() { - return this.connection != null && this.connection.codec != null; + return this.connection != null && this.connection.remoteCodec != null; } @Override @@ -1124,7 +1125,11 @@ public class RpcServer implements RpcServerInterface { /** * Codec the client asked use. */ - private Codec codec; + private Codec remoteCodec; + /** + * Codec the server is configured to use + */ + private Codec localCodec; /** * Compression codec the client asked us use. */ @@ -1565,10 +1570,11 @@ public class RpcServer implements RpcServerInterface { String className = header.getCellBlockCodecClass(); if (className == null || className.length() == 0) return; try { - this.codec = (Codec)Class.forName(className).newInstance(); + this.remoteCodec = (Codec)Class.forName(className).newInstance(); } catch (Exception e) { throw new UnsupportedCellCodecException(className, e); } + this.localCodec = IPCUtil.getCodec(HConstants.HBASE_SERVER_RPC_CODEC, KeyValueCodec.class, conf); if (!header.hasCellBlockCompressorClass()) return; className = header.getCellBlockCompressorClass(); try { @@ -1682,7 +1688,7 @@ public class RpcServer implements RpcServerInterface { offset += paramSize; } if (header.hasCellBlockMeta()) { - cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, + cellScanner = ipcUtil.createCellScanner(this.remoteCodec, this.compressionCodec, buf, offset, buf.length); } } catch (Throwable t) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 3d74993..6151cb4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -340,7 +340,7 @@ public class HStore implements Store { } } - /** + /** * @return how many bytes to write between status checks */ public static int getCloseCheckInterval() { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRPCServerCodec.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRPCServerCodec.java new file mode 100644 index 0000000..0e656ee --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRPCServerCodec.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestRPCServerCodec { + private static final Log LOG = LogFactory.getLog(TestRPCServerCodec.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final int NUM_RS = 1; + private static final String STRING_TABLE_NAME = "TestRPCServerCodec"; + private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + private static final byte[] TEST_QUAL = Bytes.toBytes("qual"); + private static final TableName TABLE_NAME = + TableName.valueOf(STRING_TABLE_NAME); + + /** + * Setup the config for the cluster + * @throws Exception on failure + */ + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.getConfiguration().set(HConstants.HBASE_SERVER_RPC_CODEC, "org.apache.hadoop.hbase.codec.CustomizedKVCodec"); + UTIL.startMiniCluster(NUM_RS); + } + + @Test + public void testClientToServerInteractionWithDifferentServerCodec() throws Exception { + HTable ht = UTIL.createTable(TABLE_NAME.getName(), TEST_FAM); + ht.setAutoFlush(false, true); + + List puts = new ArrayList(); + puts.add( createPut("row1") ); + puts.add( createPut("row2") ); + puts.add( createPut("row3") ); + puts.add( createPut("row4") ); + + HTableUtil.bucketRsPut( ht, puts ); + + Scan scan = new Scan(); + scan.addColumn(TEST_FAM, TEST_QUAL); + int count = 0; + for(Result result : ht.getScanner(scan)) { + count++; + } + LOG.info("bucket put count=" + count); + assertEquals(count, puts.size()); + ht.close(); + } + + private Put createPut(String row) { + Put put = new Put( Bytes.toBytes(row)); + put.add(TEST_FAM, TEST_QUAL, Bytes.toBytes("val1")); + return put; + } + +}