commit e07ce68d34713f4de3cbf0119afb6652267d3aad Author: Karthick Sankarachary Date: Thu Aug 26 18:07:08 2010 -0700 HBASE-2937 Facilitate Timeouts In HBase Client diff --git a/src/main/java/org/apache/hadoop/hbase/Timeout.java b/src/main/java/org/apache/hadoop/hbase/Timeout.java new file mode 100644 index 0000000..fa5a122 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/Timeout.java @@ -0,0 +1,63 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.util.concurrent.TimeUnit; + +/** + * The Timeout class denotes a specific duration + * of time at a given unit of granularity. + * + * @author Karthick Sankarachary + */ +public class Timeout { + private long duration; + private TimeUnit timeUnit; + + public Timeout() { + this(0, TimeUnit.MILLISECONDS); + } + + public Timeout(long duration, TimeUnit timeUnit) { + setDuration(duration); + setTimeUnit(timeUnit); + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public void setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + public long getDurationInMillis() { + return TimeUnit.MILLISECONDS.convert(duration, timeUnit); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/client/ClientCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ClientCallable.java new file mode 100644 index 0000000..38d0762 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/client/ClientCallable.java @@ -0,0 +1,48 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Timeout; + +/** + * The ClientCallable builds on the {@link ServerCallable}, + * and allows a timeout to be set on the underlying socket before the + * call is actually invoked. + * + * @author Karthick Sankarachary + */ +public abstract class ClientCallable extends ServerCallable { + private Timeout timeout; + + public ClientCallable(HConnection connection, byte[] tableName, byte[] row, + Timeout timeout) { + super(connection, tableName, row); + this.timeout = timeout; + } + + @Override + public void instantiateServer(boolean reload) throws IOException { + super.instantiateServer(reload); + super.connection.setRegionTimeout(server, timeout); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 8b3b6a4..b17b87e 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.Timeout; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -246,4 +247,6 @@ public interface HConnection { */ public void prewarmRegionCache(final byte[] tableName, final Map regions); + + public void setRegionTimeout(HRegionInterface regionInterface, Timeout timeout); } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index ac2b99c..3b0aebf 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.Timeout; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; @@ -1043,6 +1044,10 @@ public class HConnectionManager { return getHRegionConnection(regionServer, false); } + public void setRegionTimeout(HRegionInterface regionInterface, Timeout timeout) { + HBaseRPC.setTimeoutForProxy(regionInterface, timeout); + } + public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException { return HConnectionManager.getClientZooKeeperWatcher(conf) diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index cd60eeb..e57a04a 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.Timeout; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.util.Bytes; @@ -78,6 +79,8 @@ public class HTable implements HTableInterface { private int maxKeyValueSize; private long maxScannerResultSize; + + private Timeout timeout; /** * Creates an object to access a HBase table. @@ -163,6 +166,14 @@ public class HTable implements HTableInterface { return configuration; } + public Timeout getTimeout() { + return timeout; + } + + public void setTimeout(Timeout timeout) { + this.timeout = timeout; + } + /** * TODO Might want to change this to public, would be nice if the number * of threads would automatically change when servers were added and removed @@ -472,7 +483,7 @@ public class HTable implements HTableInterface { public Result getRowOrBefore(final byte[] row, final byte[] family) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { + new ClientCallable(connection, tableName, row, timeout) { public Result call() throws IOException { return server.getClosestRowBefore(location.getRegionInfo().getRegionName(), row, family); @@ -501,7 +512,7 @@ public class HTable implements HTableInterface { public Result get(final Get get) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, get.getRow()) { + new ClientCallable(connection, tableName, get.getRow(), timeout) { public Result call() throws IOException { return server.get(location.getRegionInfo().getRegionName(), get); } @@ -512,7 +523,7 @@ public class HTable implements HTableInterface { public void delete(final Delete delete) throws IOException { connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, delete.getRow()) { + new ClientCallable(connection, tableName, delete.getRow(), timeout) { public Boolean call() throws IOException { server.delete(location.getRegionInfo().getRegionName(), delete); return null; // FindBugs NP_BOOLEAN_RETURN_NULL @@ -571,7 +582,7 @@ public class HTable implements HTableInterface { "Invalid arguments to incrementColumnValue", npe); } return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { + new ClientCallable(connection, tableName, row, timeout) { public Long call() throws IOException { return server.incrementColumnValue( location.getRegionInfo().getRegionName(), row, family, @@ -599,7 +610,7 @@ public class HTable implements HTableInterface { final Put put) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { + new ClientCallable(connection, tableName, row, timeout) { public Boolean call() throws IOException { return server.checkAndPut(location.getRegionInfo().getRegionName(), row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE; @@ -626,7 +637,7 @@ public class HTable implements HTableInterface { final Delete delete) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { + new ClientCallable(connection, tableName, row, timeout) { public Boolean call() throws IOException { return server.checkAndDelete( location.getRegionInfo().getRegionName(), @@ -650,7 +661,7 @@ public class HTable implements HTableInterface { */ public boolean exists(final Get get) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, get.getRow()) { + new ClientCallable(connection, tableName, get.getRow(), timeout) { public Boolean call() throws IOException { return server. exists(location.getRegionInfo().getRegionName(), get); @@ -695,7 +706,7 @@ public class HTable implements HTableInterface { public RowLock lockRow(final byte [] row) throws IOException { return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { + new ClientCallable(connection, tableName, row, timeout) { public RowLock call() throws IOException { long lockId = server.lockRow(location.getRegionInfo().getRegionName(), row); @@ -708,7 +719,7 @@ public class HTable implements HTableInterface { public void unlockRow(final RowLock rl) throws IOException { connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, rl.getRow()) { + new ClientCallable(connection, tableName, rl.getRow(), timeout) { public Boolean call() throws IOException { server.unlockRow(location.getRegionInfo().getRegionName(), rl.getLockId()); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 2b5eeb6..b4135ac 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -20,20 +20,6 @@ package org.apache.hadoop.hbase.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; - -import javax.net.SocketFactory; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -44,6 +30,7 @@ import java.io.InputStream; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Hashtable; @@ -52,6 +39,22 @@ import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Timeout; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; + /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -79,12 +82,15 @@ public class HBaseClient { protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives protected final int pingInterval; // how often sends ping to the server in msecs + protected final int socketTimeout; // the default socket timeout in msecs protected final SocketFactory socketFactory; // how to create sockets private int refCount = 1; final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; + final private static String SOCKET_TIMEOUT_NAME = "ipc.socket.timeout"; final static int DEFAULT_PING_INTERVAL = 60000; // 1 min + final static int DEFAULT_SOCKET_TIMEOUT = 0; // interpreted as infinite timeout final static int PING_CALL_ID = -1; /** @@ -110,6 +116,28 @@ public class HBaseClient { } /** + * set the ping interval value in configuration + * + * @param conf Configuration + * @param pingInterval the ping interval + */ + @SuppressWarnings({"UnusedDeclaration"}) + public static void setSocketTimeout(Configuration conf, int socketTimeout) { + conf.setInt(SOCKET_TIMEOUT_NAME, socketTimeout); + } + + /** + * Get the ping interval from configuration; + * If not set in the configuration, return the default value. + * + * @param conf Configuration + * @return the ping interval + */ + static int getSocketTimeout(Configuration conf) { + return conf.getInt(SOCKET_TIMEOUT_NAME, DEFAULT_SOCKET_TIMEOUT); + } + + /** * Increment this client's reference count * */ @@ -191,6 +219,7 @@ public class HBaseClient { private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason + private Timeout clientTimeout; public Connection(InetSocketAddress address) throws IOException { this(new ConnectionId(address, null)); @@ -244,7 +273,8 @@ public class HBaseClient { * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get()) { + if (shouldCloseConnection.get() || !running.get() + || clientTimeout != null) { throw e; } sendPing(); @@ -307,7 +337,7 @@ public class HBaseClient { this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); - this.socket.setSoTimeout(pingInterval); + this.socket.setSoTimeout(socketTimeout); break; } catch (SocketTimeoutException toe) { handleConnectionFailure(timeoutFailures++, maxRetries, toe); @@ -333,6 +363,13 @@ public class HBaseClient { throw e; } } + + public void setTimeout(Timeout timeout) throws SocketException { + if (timeout != null) { + this.clientTimeout = timeout; + this.socket.setSoTimeout((int) timeout.getDurationInMillis()); + } + } /* Handle connection failures * @@ -654,6 +691,7 @@ public class HBaseClient { this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.pingInterval = getPingInterval(conf); + this.socketTimeout = getSocketTimeout(conf); if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is" + this.pingInterval + "ms."); } @@ -719,10 +757,17 @@ public class HBaseClient { } public Writable call(Writable param, InetSocketAddress addr, - UserGroupInformation ticket) + UserGroupInformation ticket) + throws IOException { + return call(param, addr, ticket, null); + } + + public Writable call(Writable param, InetSocketAddress addr, + UserGroupInformation ticket, Timeout timeout) throws IOException { Call call = new Call(param); Connection connection = getConnection(addr, ticket, call); + connection.setTimeout(timeout); connection.sendParam(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index 9873172..98f28f4 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -20,18 +20,6 @@ package org.apache.hadoop.hbase.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; - -import javax.net.SocketFactory; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -46,6 +34,20 @@ import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Timeout; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + /** A simple RPC mechanism. * * This is a local hbase copy of the hadoop RPC so we can do things like @@ -228,6 +230,7 @@ public class HBaseRPC { private UserGroupInformation ticket; private HBaseClient client; private boolean isClosed = false; + private Timeout timeout; /** * @param address address for invoker @@ -250,7 +253,8 @@ public class HBaseRPC { startTime = System.currentTimeMillis(); } HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, ticket); + client.call(new Invocation(method, args), address, ticket, + timeout); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -265,6 +269,10 @@ public class HBaseRPC { CLIENTS.stopClient(client); } } + + public void setTimeout(Timeout timeout) { + this.timeout = timeout; + } } /** @@ -365,6 +373,20 @@ public class HBaseRPC { } } } + + public static void setTimeoutForProxy(VersionedProtocol proxy, Timeout timeout) { + try { + if (proxy != null) { + ((Invoker) Proxy.getInvocationHandler(proxy)).setTimeout(timeout); + } + } catch (IllegalArgumentException iae) { + LOG.warn("Problem setting timeout on " + proxy + " as it doesn't appear to be a proxy"); + // SWALLOW + } catch (ClassCastException cce) { + LOG.warn("Problem setting timeout of " + proxy + " as it's handler is not of the expected type"); + // SWALLOW + } + } /** * Construct a client-side proxy object that implements the named protocol,