diff --git src/main/java/org/apache/hadoop/hbase/MasterNotRunningException.java src/main/java/org/apache/hadoop/hbase/MasterNotRunningException.java index 6cf564c..45e5a45 100644 --- src/main/java/org/apache/hadoop/hbase/MasterNotRunningException.java +++ src/main/java/org/apache/hadoop/hbase/MasterNotRunningException.java @@ -46,4 +46,8 @@ public class MasterNotRunningException extends IOException { public MasterNotRunningException(Exception e) { super(e); } + + public MasterNotRunningException(String s, Exception e) { + super(s, e); + } } diff --git src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 88c381f..44eee55 100644 --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -19,50 +19,27 @@ */ package org.apache.hadoop.hbase.client; -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.SocketTimeoutException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.RegionException; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.UnknownRegionException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; +import java.io.*; +import java.net.SocketTimeoutException; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + /** * Provides an interface to manage HBase database table metadata + general * administrative functions. Use HBaseAdmin to create, drop, list, enable and @@ -84,7 +61,7 @@ public class HBaseAdmin implements Abortable, Closeable { // want to wait a long time. private final int retryLongerMultiplier; private boolean aborted; - + /** * Constructor * @@ -100,48 +77,16 @@ public class HBaseAdmin implements Abortable, Closeable { this.numRetries = this.conf.getInt("hbase.client.retries.number", 10); this.retryLongerMultiplier = this.conf.getInt( "hbase.client.retries.longer.multiplier", 10); - - int tries = 0; - while ( true ){ - try { - - this.connection.getMaster(); - return; - - } catch (MasterNotRunningException mnre) { - HConnectionManager.deleteStaleConnection(this.connection); - this.connection = HConnectionManager.getConnection(this.conf); - } - - tries++; - if (tries >= numRetries) { - // we should delete connection between client and zookeeper - HConnectionManager.deleteStaleConnection(this.connection); - throw new MasterNotRunningException("Retried " + numRetries + " times"); - } - - try { - Thread.sleep(getPauseTime(tries)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // we should delete connection between client and zookeeper - HConnectionManager.deleteStaleConnection(this.connection); - throw new MasterNotRunningException( - "Interrupted after "+tries+" tries"); - } - } } /** - * Constructor for externally managed HConnections. - * This constructor fails fast if the HMaster is not running. - * The HConnection can be re-used again in another attempt. - * This constructor fails fast. - * - * @param connection The HConnection instance to use - * @throws MasterNotRunningException if the master is not running - * @throws ZooKeeperConnectionException if unable to connect to zookeeper - */ + * Constructor for externally managed HConnections. + * The connection to master will be created when required by admin functions. + * + * @param connection The HConnection instance to use + * @throws MasterNotRunningException if the master is not running + * @throws ZooKeeperConnectionException if unable to connect to zookeeper + */ public HBaseAdmin(HConnection connection) throws MasterNotRunningException, ZooKeeperConnectionException { this.conf = connection.getConfiguration(); @@ -151,8 +96,6 @@ public class HBaseAdmin implements Abortable, Closeable { this.numRetries = this.conf.getInt("hbase.client.retries.number", 10); this.retryLongerMultiplier = this.conf.getInt( "hbase.client.retries.longer.multiplier", 10); - - this.connection.getMaster(); } /** @@ -202,18 +145,30 @@ public class HBaseAdmin implements Abortable, Closeable { * @return proxy connection to master server for this instance * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper + * @deprecated Master is an implementation detail for HBaseAdmin. If you need + * to use the master interface get a shared connection from + * {@link org.apache.hadoop.hbase.client.HConnection#getSharedMaster()}. + * Deprecated in february 2012, targeted release: 0.94 */ + @Deprecated public HMasterInterface getMaster() throws MasterNotRunningException, ZooKeeperConnectionException { - return this.connection.getMaster(); + // We take a shared master, but we will never release it, + // so we will have the same behavior as before. + return this.connection.getSharedMaster(); } /** @return - true if the master server is running * @throws ZooKeeperConnectionException - * @throws MasterNotRunningException */ - public boolean isMasterRunning() + * @throws MasterNotRunningException + * @deprecated see {@link #getMaster()}. Replaced by: + * {@link HConnection#isMasterRunning()}. + * Deprecated in february 2012, targeted release: 0.94 + */ + @Deprecated + public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { - return this.connection.isMasterRunning(); + return connection.isMasterRunning(); } /** @@ -447,7 +402,9 @@ public class HBaseAdmin implements Abortable, Closeable { /** * Creates a new table but does not block and wait for it to come online. - * Asynchronous operation. + * Asynchronous operation. To check if the table exists, use + * {@link: #isTableAvailable} -- it is not safe to create an HTable + * instance to this table before it is available. * * @param desc table descriptor for table * @@ -458,7 +415,8 @@ public class HBaseAdmin implements Abortable, Closeable { * and attempt-at-creation). * @throws IOException */ - public void createTableAsync(HTableDescriptor desc, byte [][] splitKeys) + public void createTableAsync( + final HTableDescriptor desc, final byte [][] splitKeys) throws IOException { HTableDescriptor.isLegalTableName(desc.getName()); if(splitKeys != null && splitKeys.length > 1) { @@ -474,11 +432,14 @@ public class HBaseAdmin implements Abortable, Closeable { lastKey = splitKey; } } - try { - getMaster().createTable(desc, splitKeys); - } catch (RemoteException e) { - throw e.unwrapRemoteException(); - } + + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.createTable(desc, splitKeys); + return null; + } + }); } /** @@ -500,14 +461,17 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public void deleteTable(final byte [] tableName) throws IOException { - isMasterRunning(); HTableDescriptor.isLegalTableName(tableName); HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - try { - getMaster().deleteTable(tableName); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.deleteTable(tableName); + return null; + } + }); + // Wait until all regions deleted HRegionInterface server = connection.getHRegionConnection(firstMetaServer.getHostname(), firstMetaServer.getPort()); @@ -527,7 +491,13 @@ public class HBaseAdmin implements Abortable, Closeable { // HMaster removes the table from its HTableDescriptors if (values == null) { boolean tableExists = false; - HTableDescriptor[] htds = getMaster().getHTableDescriptors(); + HTableDescriptor[] htds; + CloseableMasterConnection master = connection.getSharedMaster(); + try { + htds = master.getHTableDescriptors(); + } finally { + master.close(); + } if (htds != null && htds.length > 0) { for (HTableDescriptor htd: htds) { if (Bytes.equals(tableName, htd.getName())) { @@ -543,15 +513,16 @@ public class HBaseAdmin implements Abortable, Closeable { } catch (IOException ex) { if(tries == numRetries - 1) { // no more tries left if (ex instanceof RemoteException) { - ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); + throw ((RemoteException) ex).unwrapRemoteException(); + }else { + throw ex; } - throw ex; } } finally { if (scannerId != -1L) { try { server.close(scannerId); - } catch (Exception ex) { + } catch (IOException ex) { LOG.warn(ex); } } @@ -676,12 +647,13 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void enableTableAsync(final byte [] tableName) throws IOException { - isMasterRunning(); - try { - getMaster().enableTable(tableName); - } catch (RemoteException e) { - throw e.unwrapRemoteException(); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.enableTable(tableName); + return null; + } + }); LOG.info("Started enable of " + Bytes.toString(tableName)); } @@ -744,13 +716,14 @@ public class HBaseAdmin implements Abortable, Closeable { * @since 0.90.0 */ public void disableTableAsync(final byte [] tableName) throws IOException { - isMasterRunning(); - try { - getMaster().disableTable(tableName); - } catch (RemoteException e) { - throw e.unwrapRemoteException(); - } - LOG.info("Started disable of " + Bytes.toString(tableName)); + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + LOG.info("Started disable of " + Bytes.toString(tableName)); + master.disableTable(tableName); + return null; + } + }); } public void disableTable(final String tableName) @@ -913,11 +886,12 @@ public class HBaseAdmin implements Abortable, Closeable { public Pair getAlterStatus(final byte[] tableName) throws IOException { HTableDescriptor.isLegalTableName(tableName); - try { - return getMaster().getAlterStatus(tableName); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + return execute(new MasterCallable>() { + @Override + public Pair call() throws IOException { + return master.getAlterStatus(tableName); + } + }); } /** @@ -941,14 +915,15 @@ public class HBaseAdmin implements Abortable, Closeable { * @param column column descriptor of column to be added * @throws IOException if a remote or network exception occurs */ - public void addColumn(final byte [] tableName, HColumnDescriptor column) + public void addColumn(final byte [] tableName, final HColumnDescriptor column) throws IOException { - HTableDescriptor.isLegalTableName(tableName); - try { - getMaster().addColumn(tableName, column); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.addColumn(tableName, column); + return null; + } + }); } /** @@ -974,11 +949,13 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void deleteColumn(final byte [] tableName, final byte [] columnName) throws IOException { - try { - getMaster().deleteColumn(tableName, columnName); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.deleteColumn(tableName, columnName); + return null; + } + }); } /** @@ -994,6 +971,8 @@ public class HBaseAdmin implements Abortable, Closeable { modifyColumn(Bytes.toBytes(tableName), descriptor); } + + /** * Modify an existing column family on a table. * Asynchronous operation. @@ -1002,16 +981,15 @@ public class HBaseAdmin implements Abortable, Closeable { * @param descriptor new column descriptor to use * @throws IOException if a remote or network exception occurs */ - public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor) + public void modifyColumn(final byte [] tableName, final HColumnDescriptor descriptor) throws IOException { - try { - getMaster().modifyColumn(tableName, descriptor); - } catch (RemoteException re) { - // Convert RE exceptions in here; client shouldn't have to deal with them, - // at least w/ the type of exceptions that come out of this method: - // TableNotFoundException, etc. - throw RemoteExceptionHandler.decodeRemoteException(re); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.modifyColumn(tableName, descriptor); + return null; + } + }); } /** @@ -1305,7 +1283,12 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void move(final byte [] encodedRegionName, final byte [] destServerName) throws UnknownRegionException, MasterNotRunningException, ZooKeeperConnectionException { - getMaster().move(encodedRegionName, destServerName); + CloseableMasterConnection master = connection.getSharedMaster(); + try { + master.move(encodedRegionName, destServerName); + } finally { + master.close(); + } } /** @@ -1317,7 +1300,13 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void assign(final byte[] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - getMaster().assign(regionName); + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.assign(regionName); + return null; + } + }); } /** @@ -1336,7 +1325,13 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void unassign(final byte [] regionName, final boolean force) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - getMaster().unassign(regionName, force); + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.unassign(regionName, force); + return null; + } + }); } /** @@ -1346,7 +1341,12 @@ public class HBaseAdmin implements Abortable, Closeable { */ public boolean balanceSwitch(final boolean b) throws MasterNotRunningException, ZooKeeperConnectionException { - return getMaster().balanceSwitch(b); + CloseableMasterConnection master = connection.getSharedMaster(); + try { + return master.balanceSwitch(b); + } finally { + master.close(); + } } /** @@ -1357,7 +1357,12 @@ public class HBaseAdmin implements Abortable, Closeable { */ public boolean balancer() throws MasterNotRunningException, ZooKeeperConnectionException { - return getMaster().balance(); + CloseableMasterConnection master = connection.getSharedMaster(); + try { + return master.balance(); + } finally { + master.close(); + } } /** @@ -1452,24 +1457,23 @@ public class HBaseAdmin implements Abortable, Closeable { * @param htd modified description of the table * @throws IOException if a remote or network exception occurs */ - public void modifyTable(final byte [] tableName, HTableDescriptor htd) + public void modifyTable(final byte [] tableName, final HTableDescriptor htd) throws IOException { - try { - getMaster().modifyTable(tableName, htd); - } catch (RemoteException re) { - // Convert RE exceptions in here; client shouldn't have to deal with them, - // at least w/ the type of exceptions that come out of this method: - // TableNotFoundException, etc. - throw RemoteExceptionHandler.decodeRemoteException(re); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.modifyTable(tableName, htd); + return null; + } + }); } /** * @param tableNameOrRegionName Name of a table or name of a region. - * @param ct A {@link #CatalogTracker} instance (caller of this method usually has one). + * @param ct A {@link CatalogTracker} instance (caller of this method usually has one). * @return True if tableNameOrRegionName is a verified region - * name (we call {@link #MetaReader.getRegion(CatalogTracker catalogTracker, - * byte [] regionName)};) else false. + * name (we call {@link MetaReader#getRegion( CatalogTracker, byte[])} + * else false. * Throw an exception if tableNameOrRegionName is null. * @throws IOException */ @@ -1486,7 +1490,7 @@ public class HBaseAdmin implements Abortable, Closeable { * Convert the table name byte array into a table name string and check if table * exists or not. * @param tableNameBytes Name of a table. - * @param ct A {@link #CatalogTracker} instance (caller of this method usually has one). + * @param ct A {@link CatalogTracker} instance (caller of this method usually has one). * @return tableName in string form. * @throws IOException if a remote or network exception occurs. * @throws TableNotFoundException if table does not exist. @@ -1505,12 +1509,13 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public synchronized void shutdown() throws IOException { - isMasterRunning(); - try { - getMaster().shutdown(); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.shutdown(); + return null; + } + }); } /** @@ -1520,12 +1525,13 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public synchronized void stopMaster() throws IOException { - isMasterRunning(); - try { - getMaster().stopMaster(); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + execute(new MasterCallable() { + @Override + public Void call() throws IOException { + master.stopMaster(); + return null; + } + }); } /** @@ -1548,7 +1554,12 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public ClusterStatus getClusterStatus() throws IOException { - return getMaster().getClusterStatus(); + return execute(new MasterCallable() { + @Override + public ClusterStatus call() { + return master.getClusterStatus(); + } + }); } private HRegionLocation getFirstMetaServerForTable(final byte [] tableName) @@ -1602,6 +1613,7 @@ public class HBaseAdmin implements Abortable, Closeable { return Regions; } + @Override public void close() throws IOException { if (this.connection != null) { this.connection.close(); @@ -1648,4 +1660,32 @@ public class HBaseAdmin implements Abortable, Closeable { return null; } } + + /** + * @see {@link #execute} + */ + private abstract static class MasterCallable implements Callable{ + protected CloseableMasterConnection master; + } + + /** + * This method allows to execute a function requiring a connection to + * master without having to manage the connection creation/release. + * Create a {@link MasterCallable} to use it. + */ + private V execute(MasterCallable function) throws IOException { + function.master = connection.getSharedMaster(); + try { + return function.call(); + } catch (RemoteException re) { + throw re.unwrapRemoteException(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + // This should not happen... + throw new IOException("Unexpected exception when calling master", e); + } finally { + function.master.close(); + } + } } diff --git src/main/java/org/apache/hadoop/hbase/client/HConnection.java src/main/java/org/apache/hadoop/hbase/client/HConnection.java index c7b730b..622da69 100644 --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -68,17 +68,53 @@ public interface HConnection extends Abortable, Closeable { * @return ZooKeeperWatcher handle being used by the connection. * @throws IOException if a remote or network exception occurs * @deprecated Removed because it was a mistake exposing zookeeper in this - * interface (ZooKeeper is an implementation detail). + * interface (ZooKeeper is an implementation detail). Use + * {@link #getSharedZooKeeperWatcher} instead. Deprecated feb' 2012 for 0.94. */ + @Deprecated public ZooKeeperWatcher getZooKeeperWatcher() throws IOException; /** + * Return a shared instance of a ZooKeeperWatcher for this connection. + * Must be released by {@link #releaseSharedZooKeeperWatcher}. As + * {@link #getSharedMaster} there is a keep alive when the instance is + * released. However, there is no retry mechanism when connecting to + * ZooKeeper: it will fail immediately if ZooKeeper is not available. + */ + public ZooKeeperWatcher getSharedZooKeeperWatcher() throws IOException; + + /** * @return proxy connection to master server for this instance * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper + * @deprecated Removed because it was a mistake exposing master in this + * interface (master is an implementation detail). Master is available + * from {@link HBaseAdmin} interface. */ + @Deprecated public HMasterInterface getMaster() - throws MasterNotRunningException, ZooKeeperConnectionException; + throws MasterNotRunningException, ZooKeeperConnectionException; + + /** + * Get a master interface. This instance is shared between the connection + * user. It must be released with {@link #releaseSharedMaster}, typically + * in a 'finally' clause, when you have finished with it. + * The function checks that the master is currently running. If not, it + * will try to reconnect to the master multiple time, using the + * {@link org.apache.hadoop.hbase.HConstants#HBASE_CLIENT_RETRIES_NUMBER} + * parameter. + * The number of user is counted. When there is no user, the connection is + * closed after a keep alive of five minutes. + */ + public CloseableMasterConnection getSharedMaster() + throws MasterNotRunningException, ZooKeeperConnectionException; + + /** + * See {@link #getSharedMaster} + * @param master - The instance to release. If null, the reference counter + * won't be decreased. + */ + //void releaseSharedMaster(HMasterInterface master); /** @return - true if the master server is running */ public boolean isMasterRunning() @@ -193,6 +229,7 @@ public interface HConnection extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs * @deprecated Use {@link #getHRegionConnection(String, int)} */ + @Deprecated public HRegionInterface getHRegionConnection(HServerAddress regionServer) throws IOException; @@ -213,8 +250,9 @@ public interface HConnection extends Abortable, Closeable { * @param getMaster - do we check if master is alive * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs - * @deprecated Use {@link #getHRegionConnection(HServerAddress, boolean)} + * @deprecated Use {@link #getHRegionConnection(String, int)} */ + @Deprecated public HRegionInterface getHRegionConnection(HServerAddress regionServer, boolean getMaster) throws IOException; @@ -255,6 +293,7 @@ public interface HConnection extends Abortable, Closeable { * @throws RuntimeException other unspecified error * @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)} */ + @Deprecated public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException; @@ -268,6 +307,7 @@ public interface HConnection extends Abortable, Closeable { * @throws RuntimeException other unspecified error * @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)} */ + @Deprecated public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException; @@ -361,6 +401,7 @@ public interface HConnection extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs * @deprecated This method will be changed from public to package protected. */ + @Deprecated public int getCurrentNrHRS() throws IOException; /** diff --git src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 34b6ba8..5ae1b0f 100644 --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -19,70 +19,29 @@ */ package org.apache.hadoop.hbase.client; -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.Proxy; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MasterAddressTracker; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; -import org.apache.hadoop.hbase.ipc.HBaseRPC; -import org.apache.hadoop.hbase.ipc.HMasterInterface; -import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.*; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.SoftValueSortedMap; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ClusterId; -import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; -import org.apache.hadoop.hbase.zookeeper.ZKTable; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.util.*; +import org.apache.hadoop.hbase.zookeeper.*; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.*; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + /** * A non-instantiable class that manages {@link HConnection}s. * This class has a static Map of {@link HConnection} instances keyed by @@ -139,7 +98,7 @@ public class HConnectionManager { public static final int MAX_CACHED_HBASE_INSTANCES; - private static Log LOG = LogFactory.getLog(HConnectionManager.class); + private static final Log LOG = LogFactory.getLog(HConnectionManager.class); static { // We set instances to one more than the value specified for {@link @@ -492,17 +451,8 @@ public class HConnectionManager { private final int rpcTimeout; private final int prefetchRegionLimit; - private final Object masterLock = new Object(); private volatile boolean closed; private volatile boolean aborted; - private volatile HMasterInterface master; - private volatile boolean masterChecked; - // ZooKeeper reference - private ZooKeeperWatcher zooKeeper; - // ZooKeeper-based master address tracker - private MasterAddressTracker masterAddressTracker; - private RootRegionTracker rootRegionTracker; - private ClusterId clusterId; private final Object metaRegionLock = new Object(); @@ -539,7 +489,7 @@ public class HConnectionManager { private boolean stopProxy; private int refCount; - // indicates whether this connection's life cycle is managed + // indicates whether this connection's life cycle is managed (by us) private final boolean managed; /** * constructor @@ -574,158 +524,225 @@ public class HConnectionManager { HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - setupZookeeperTrackers(); - - this.master = null; - this.masterChecked = false; + retrieveClusterId(); } - private synchronized void setupZookeeperTrackers() - throws ZooKeeperConnectionException{ - // initialize zookeeper and master address manager - this.zooKeeper = getZooKeeperWatcher(); - masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); - masterAddressTracker.start(); - - this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this); - this.rootRegionTracker.start(); - - this.clusterId = new ClusterId(this.zooKeeper, this); - } + /** + * Read the clusterId from ZooKeeper. + * Set it to null if it can't connect. + */ + private void retrieveClusterId(){ + CloseableZooKeeperWatcher zkw = null; - private synchronized void resetZooKeeperTrackers() - throws ZooKeeperConnectionException { - LOG.info("Trying to reconnect to zookeeper"); - masterAddressTracker.stop(); - masterAddressTracker = null; - rootRegionTracker.stop(); - rootRegionTracker = null; - clusterId = null; - this.zooKeeper = null; - setupZookeeperTrackers(); + try { + zkw = getSharedZooKeeperWatcher(); + String clusterId = + Bytes.toString(ZKUtil.getData(zkw, zkw.clusterIdZNode)); + if (clusterId != null) { + conf.set(HConstants.CLUSTER_ID, clusterId); + LOG.info("ClusterId is "+clusterId); + }else { + LOG.info( + "ClusterId is null, so we don't put in the connection configuration"); + } + } catch (KeeperException e) { + LOG.warn("Can't retrieve clusterId from Zookeeper",e); + } catch (IOException e) { + LOG.warn("Can't retrieve clusterId from Zookeeper",e); + } finally { + zkw.close(); + } } + @Override public Configuration getConfiguration() { return this.conf; } - public HMasterInterface getMaster() - throws MasterNotRunningException, ZooKeeperConnectionException { - // TODO: REMOVE. MOVE TO HBaseAdmin and redo as a Callable!!! - // Check if we already have a good master connection + /** + * Create a new Master proxy. Try once only. + */ + private HMasterInterface createMasterInterface() + throws IOException, KeeperException { + + CloseableZooKeeperWatcher zkw; try { - if (master != null && master.isMasterRunning()) { - return master; + zkw = getSharedZooKeeperWatcher(); + } catch (IOException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } + + try { + + checkIfBaseNodeAvailable(zkw); + + ServerName sn = ServerName.parseVersionedServerName( + ZKUtil.getData(zkw, zkw.masterAddressZNode)); + + if (sn == null) { + String msg = + "ZooKeeper available but no active master location found"; + LOG.info(msg); + throw new MasterNotRunningException(msg); + } + + + InetSocketAddress isa = + new InetSocketAddress(sn.getHostname(), sn.getPort()); + HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy( + HMasterInterface.class, HMasterInterface.VERSION, isa, this.conf, + this.rpcTimeout); + + if (tryMaster.isMasterRunning()) { + return tryMaster; + } else { + HBaseRPC.stopProxy(tryMaster); + String msg = "Can create a proxy to master, but it is not running"; + LOG.info(msg); + throw new MasterNotRunningException(msg); } - } catch (UndeclaredThrowableException ute) { - // log, but ignore, the loop below will attempt to reconnect - LOG.info("Exception contacting master. Retrying...", ute.getCause()); + } finally { + zkw.close(); } + } - checkIfBaseNodeAvailable(); - ServerName sn = null; - synchronized (this.masterLock) { - this.master = null; - for (int tries = 0; - !this.closed && - !this.masterChecked && this.master == null && - tries < numRetries; - tries++) { + /** + * Get a connection to master. This connection will be reset if master dies. + * Don't close it, it will be closed with the connection. + * @deprecated This function is deprecated because it creates a never ending + * connection to master and hence wastes resources. + * Use {@link #getSharedMaster()} instead. + * + * Internally, we're using the shared master as there is no reason to create + * a new connection. However, in this case, we will never close the shared + * master. + */ + @Override + @Deprecated + public HMasterInterface getMaster() throws + MasterNotRunningException, ZooKeeperConnectionException { + canCloseMaster = false; + try { + return getSharedMaster(); + } catch (MasterNotRunningException e) { + throw e; + } catch (IOException e) { + throw new ZooKeeperConnectionException( + "Can't create a connection to master", e); + } + } + /** + * Create a master, retries if necessary. + */ + private HMasterInterface createMasterWithRetries() + throws MasterNotRunningException { + + // The lock must be at the beginning to prevent multiple master creation + // (and leaks) in a multithread context + synchronized (this.masterLock) { + Exception exceptionCaught = null; + HMasterInterface master = null; + int tries = 0; + while ( + !this.closed && master == null + ) { + tries++; try { - sn = masterAddressTracker.getMasterAddress(); - if (sn == null) { - LOG.info("ZooKeeper available but no active master location found"); - throw new MasterNotRunningException(); - } + master = createMasterInterface(); + } catch (IOException e) { + exceptionCaught = e; + } catch (KeeperException e) { + exceptionCaught = e; + } - if (clusterId.hasId()) { - conf.set(HConstants.CLUSTER_ID, clusterId.getId()); - } - InetSocketAddress isa = - new InetSocketAddress(sn.getHostname(), sn.getPort()); - HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( - HMasterInterface.class, HMasterInterface.VERSION, isa, this.conf, - this.rpcTimeout); - - if (tryMaster.isMasterRunning()) { - this.master = tryMaster; - this.masterLock.notifyAll(); - break; - } + if (exceptionCaught != null) + // It failed. If it's not the last try, we're going to wait a little + if (tries < numRetries) { + long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries); + LOG.info("getMaster attempt " + tries + " of " + numRetries + + " failed; retrying after sleep of " +pauseTime, exceptionCaught); - } catch (IOException e) { - if (tries == numRetries - 1) { - // This was our last chance - don't bother sleeping - LOG.info("getMaster attempt " + tries + " of " + numRetries + - " failed; no more retrying.", e); - break; + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Thread was interrupted while trying to connect to master.", e); } - LOG.info("getMaster attempt " + tries + " of " + numRetries + - " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries), e); - } - // Cannot connect to master or it is not running. Sleep & retry - try { - this.masterLock.wait(ConnectionUtils.getPauseTime(this.pause, tries)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Thread was interrupted while trying to connect to master."); + } else { + // Enough tries, we stop now + LOG.info("getMaster attempt " + tries + " of " + numRetries + + " failed; no more retrying.", exceptionCaught); + throw new MasterNotRunningException(exceptionCaught); } } - // Avoid re-checking in the future if this is a managed HConnection, - // even if we failed to acquire a master. - // (this is to retain the existing behavior before HBASE-5058) - this.masterChecked = managed; - - if (this.master == null) { - if (sn == null) { - throw new MasterNotRunningException(); - } - throw new MasterNotRunningException(sn.toString()); + + if (master == null) { + // implies this.closed true + throw new MasterNotRunningException( + "Connection was closed while trying to get master"); } - return this.master; + + return master; } } - private void checkIfBaseNodeAvailable() throws MasterNotRunningException { - if (false == masterAddressTracker.checkIfBaseNodeAvailable()) { - String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " + private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw) + throws MasterNotRunningException { + String errorMsg; + try { + if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { + errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. " + + "It should have been written by the master. " + + "Check the value configured in 'zookeeper.znode.parent'. " + "There could be a mismatch with the one configured in the master."; + LOG.error(errorMsg); + throw new MasterNotRunningException(errorMsg); + } + } catch (KeeperException e) { + errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage(); LOG.error(errorMsg); - throw new MasterNotRunningException(errorMsg); + throw new MasterNotRunningException(errorMsg, e); } } + /** + * @return true if the master is running, throws an exception otherwise + * @throws MasterNotRunningException - if the master is not running + * @throws ZooKeeperConnectionException + */ + @Override public boolean isMasterRunning() - throws MasterNotRunningException, ZooKeeperConnectionException { - if (this.master == null) { - getMaster(); - } - boolean isRunning = master.isMasterRunning(); - if(isRunning) { - return true; - } - throw new MasterNotRunningException(); + throws MasterNotRunningException, ZooKeeperConnectionException { + // When getting the master proxy connection, we check it's running, + // so if there is no exception, it means we've been able to get a + // connection on a running master + getSharedMaster().close(); + return true; } + @Override public HRegionLocation getRegionLocation(final byte [] name, final byte [] row, boolean reload) throws IOException { return reload? relocateRegion(name, row): locateRegion(name, row); } + @Override public boolean isTableEnabled(byte[] tableName) throws IOException { return testTableOnlineState(tableName, true); } + @Override public boolean isTableDisabled(byte[] tableName) throws IOException { return testTableOnlineState(tableName, false); } + @Override public boolean isTableAvailable(final byte[] tableName) throws IOException { final AtomicBoolean available = new AtomicBoolean(true); final AtomicInteger regionCount = new AtomicInteger(0); @@ -763,13 +780,16 @@ public class HConnectionManager { return online; } String tableNameStr = Bytes.toString(tableName); + CloseableZooKeeperWatcher zkw = getSharedZooKeeperWatcher(); try { if (online) { - return ZKTable.isEnabledTable(this.zooKeeper, tableNameStr); + return ZKTable.isEnabledTable(zkw, tableNameStr); } - return ZKTable.isDisabledTable(this.zooKeeper, tableNameStr); + return ZKTable.isDisabledTable(zkw, tableNameStr); } catch (KeeperException e) { throw new IOException("Enable/Disable failed", e); + }finally { + zkw.close(); } } @@ -787,12 +807,14 @@ public class HConnectionManager { return null; } + @Override public HRegionLocation locateRegion(final byte [] tableName, final byte [] row) throws IOException{ return locateRegion(tableName, row, true); } + @Override public HRegionLocation relocateRegion(final byte [] tableName, final byte [] row) throws IOException{ @@ -808,26 +830,34 @@ public class HConnectionManager { "table name cannot be null or zero length"); } - if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { - try { - ServerName servername = - this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); - LOG.debug("Looked up root region location, connection=" + this + - "; serverName=" + ((servername == null)? "": servername.toString())); - if (servername == null) return null; - return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, - servername.getHostname(), servername.getPort()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { - return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row, + CloseableZooKeeperWatcher zkw = getSharedZooKeeperWatcher(); + try { + if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { + try { + ServerName servername = RootRegionTracker.dataToServerName( + ZKUtil.blockUntilAvailable( + zkw, zkw.rootServerZNode, this.rpcTimeout) + ); + + LOG.debug("Looked up root region location, connection=" + this + + "; serverName=" + ((servername == null) ? "" : servername)); + if (servername == null) return null; + return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, + servername.getHostname(), servername.getPort()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { + return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row, useCache, metaRegionLock); - } else { - // Region not in the cache - have to go to the meta RS - return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, + } else { + // Region not in the cache - have to go to the meta RS + return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, useCache, userRegionLock); + } + } finally { + zkw.close(); } } @@ -1208,6 +1238,8 @@ public class HConnectionManager { } } + @Override + @Deprecated public HRegionInterface getHRegionConnection(HServerAddress hsa) throws IOException { return getHRegionConnection(hsa, false); @@ -1220,6 +1252,8 @@ public class HConnectionManager { return getHRegionConnection(hostname, port, false); } + @Override + @Deprecated public HRegionInterface getHRegionConnection(HServerAddress hsa, boolean master) throws IOException { @@ -1246,7 +1280,6 @@ public class HConnectionManager { HRegionInterface getHRegionConnection(final String hostname, final int port, final InetSocketAddress isa, final boolean master) throws IOException { - if (master) getMaster(); HRegionInterface server; String rsName = null; if (isa != null) { @@ -1266,9 +1299,6 @@ public class HConnectionManager { server = this.servers.get(rsName); if (server == null) { try { - if (clusterId.hasId()) { - conf.set(HConstants.CLUSTER_ID, clusterId.getId()); - } // Only create isa when we need to. InetSocketAddress address = isa != null? isa: new InetSocketAddress(hostname, port); @@ -1290,38 +1320,302 @@ public class HConnectionManager { return server; } + @Override + @Deprecated + public ZooKeeperWatcher getZooKeeperWatcher() + throws ZooKeeperConnectionException { + canCloseZKW = false; + + try { + return getSharedZooKeeperWatcher(); + } catch (ZooKeeperConnectionException e){ + throw e; + }catch (IOException e) { + // Encapsulate exception to keep interface + throw new ZooKeeperConnectionException( + "Can't create a zookeeper connection", e); + } + } + /** - * Get the ZooKeeper instance for this TableServers instance. - * - * If ZK has not been initialized yet, this will connect to ZK. - * @returns zookeeper reference - * @throws ZooKeeperConnectionException if there's a problem connecting to zk + * Retrieve a shared ZooKeeperWatcher. You must release it it once you've have + * finished to use it. + * @return */ - public synchronized ZooKeeperWatcher getZooKeeperWatcher() - throws ZooKeeperConnectionException { - if(zooKeeper == null) { + private CloseableZooKeeperWatcher sharedZookeeper; + private int sharedZookeeperUserCount; + private boolean canCloseZKW = true; + private final Object zooKeeperWatcherLock = new Object(); + + private static final long keepAlive = 5* 60 * 1000; + + public CloseableZooKeeperWatcher getSharedZooKeeperWatcher() throws IOException { + synchronized (zooKeeperWatcherLock) { + + if (sharedZookeeper == null) { + // We today don't check that our link to ZooKeeper is still valid + // It seems the only way to do it would be to read something + sharedZookeeper = new CloseableZooKeeperWatcher(conf, "hconnection", this); + } + sharedZookeeperUserCount++; + keepSharedZooKeeperWatcherAliveUntil = Long.MAX_VALUE; + + return sharedZookeeper; + } + } + + void releaseSharedZooKeeperWatcher(CloseableZooKeeperWatcher zkw) { + if (zkw == null){ + return; + } + synchronized (zooKeeperWatcherLock) { + --sharedZookeeperUserCount; + if (sharedZookeeperUserCount <=0 ){ + keepSharedZooKeeperWatcherAliveUntil = + System.currentTimeMillis() + keepAlive; + } + } + } + + private long keepSharedZooKeeperWatcherAliveUntil = Long.MAX_VALUE; + private long keepSharedMasterAliveUntil = Long.MAX_VALUE; + private final DelayedClosing delayedClosing = DelayedClosing.createAndStart(this); + + /** + * Creates a daemon thread to check the connections to master & zookeeper + * and close them if they have reachec their closing time ( + * {@link #keepSharedMasterAliveUntil} and + * {@link #keepSharedZooKeeperWatcherAliveUntil}). Keep alive time is + * managed by the release functions and the variable {@link #keepAlive} + */ + private static class DelayedClosing implements Abortable, Runnable { + private HConnectionImplementation hci; + + static DelayedClosing createAndStart(HConnectionImplementation phci){ + DelayedClosing dc = new DelayedClosing(phci); + new Thread(dc, "ZooKeeperWatcher and Master delayed closing for connection "+phci).start(); + return dc; + } + + DelayedClosing(HConnectionImplementation hci){ + this.hci = hci; + } + + public void delayedClose() throws InterruptedException { + while (!aborted) { + synchronized (abortedNotify){ + // We check every minute. + abortedNotify.wait(60*1000); + } + if (hci.canCloseZKW) { + synchronized (hci.zooKeeperWatcherLock) { + if (System.currentTimeMillis() > hci.keepSharedZooKeeperWatcherAliveUntil) { + hci.closeSharedZooKeeperWatcher(); + hci.keepSharedZooKeeperWatcherAliveUntil = Long.MAX_VALUE; + } + } + } + if (hci.canCloseMaster) { + synchronized (hci.masterLock) { + if (System.currentTimeMillis() > hci.keepSharedMasterAliveUntil) { + hci.closeSharedMaster(); + hci.keepSharedMasterAliveUntil = Long.MAX_VALUE; + } + } + } + } + } + + private volatile Boolean aborted = false; + private final Object abortedNotify = new Object(); + + @Override + public void abort(String why, Throwable e) { + aborted = true; + synchronized (abortedNotify){ + abortedNotify.notifyAll(); + } + } + + @Override + public boolean isAborted() { + return aborted; + } + + @Override + public void run() { try { - this.zooKeeper = new ZooKeeperWatcher(conf, "hconnection", this); - } catch(ZooKeeperConnectionException zce) { - throw zce; - } catch (IOException e) { - throw new ZooKeeperConnectionException("An error is preventing" + - " HBase from connecting to ZooKeeper", e); + delayedClose(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private void closeSharedZooKeeperWatcher() { + synchronized (zooKeeperWatcherLock) { + if (sharedZookeeper != null) { + LOG.info("Closing zookeeper sessionid=0x" + + Long.toHexString( + sharedZookeeper.getRecoverableZooKeeper().getSessionId())); + sharedZookeeper.internalClose(); + sharedZookeeper = null; + } + sharedZookeeperUserCount = 0; + } + } + + + private abstract static class AddCloseableHandler implements InvocationHandler { + private HConnectionImplementation connection; + private T master; + + protected AddCloseableHandler(HConnectionImplementation connection, T master) { + this.connection = connection; + this.master = master; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getName().equals("close") && + method.getParameterTypes().length == 0) { + release(connection, master); + return null; + } else { + try { + return method.invoke(master, args); + }catch (InvocationTargetException e){ + // We will have this for all the exception, checked on not, sent + // by any layer, including the functional exception + if (e.getCause () == null){ + throw new RuntimeException( + "Proxy invocation failed and getCause is null", e); + } + throw e.getCause(); + } + } + } + + protected abstract void release(HConnectionImplementation connection, T target); + } + + private static class MasterHandler extends AddCloseableHandler{ + MasterHandler(HConnectionImplementation connection, HMasterInterface master) { + super(connection, master); + } + + @Override + protected void release(HConnectionImplementation connection, org.apache.hadoop.hbase.ipc.HMasterInterface target) { + connection.releaseSharedMaster(target); + } + } + + /* + private static class SharedZooKeeperWatcherHandler extends AddCloseableHandler{ + SharedZooKeeperWatcherHandler(HConnectionImplementation connection, ZooKeeperWatcher zkw) { + super(connection, zkw); + } + + @Override + protected void release(HConnectionImplementation connection, ZooKeeperWatcher target) { + connection.releaseSharedZooKeeperWatcher(target); + } + } + */ + + + + private HMasterInterface sharedMaster; + private int sharedMasterUserCount; + private final Object masterLock = new Object(); + + // The old interface {@link #getMaster} allows to get a master that's never + // closed. So if someone uses this interface, we never close the shared + // master whatever the user count... + // When closing the connection, we close the master as well, as in the + // previous implementation. + private boolean canCloseMaster = true; + + @Override + public CloseableMasterConnection getSharedMaster() throws MasterNotRunningException { + synchronized (masterLock) { + if (!isSharedMasterConnectedAndRunning()) { + if (sharedMaster != null) { + HBaseRPC.stopProxy(sharedMaster); + } + sharedMaster = null; + sharedMaster = createMasterWithRetries(); + } + sharedMasterUserCount++; + keepSharedMasterAliveUntil = Long.MAX_VALUE; + + return (CloseableMasterConnection) Proxy.newProxyInstance( + CloseableMasterConnection.class.getClassLoader(), + new Class[]{CloseableMasterConnection.class}, + new MasterHandler(this, sharedMaster) + ); + } + } + + private boolean isSharedMasterConnectedAndRunning(){ + if (sharedMaster == null){ + return false; + } + try { + return sharedMaster.isMasterRunning(); + }catch (UndeclaredThrowableException e){ + // It's somehow messy, but we can receive exceptions such as + // java.net.ConnectException but they're not declared. So we catch + // it... + LOG.info("Master connection is not running anymore", + e.getUndeclaredThrowable()); + return false; + } + } + + private void releaseSharedMaster(HMasterInterface master) { + if (master == null){ + return; + } + synchronized (masterLock) { + --sharedMasterUserCount; + if (sharedMasterUserCount <= 0) { + keepSharedMasterAliveUntil = + System.currentTimeMillis() + keepAlive; + } + } + } + + /** + * Immediate close of the shared master. Can be by the delayed close or + * when closing the connection itself. + */ + private void closeSharedMaster() { + synchronized (masterLock) { + if (sharedMaster != null ){ + LOG.info("Closing master connection"); + HBaseRPC.stopProxy(sharedMaster); + sharedMaster = null; } + sharedMasterUserCount = 0; } - return zooKeeper; } + + + @Override public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException { return callable.withRetries(); } + @Override public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { return callable.withoutRetries(); } + @Deprecated private Callable createCallable(final HRegionLocation loc, final MultiAction multi, final byte [] tableName) { // TODO: This does not belong in here!!! St.Ack HConnections should @@ -1345,6 +1639,8 @@ public class HConnectionManager { }; } + @Override + @Deprecated public void processBatch(List list, final byte[] tableName, ExecutorService pool, @@ -1429,6 +1725,7 @@ public class HConnectionManager { * Parameterized batch processing, allowing varying return types for * different {@link Row} implementations. */ + @Override public void processBatchCallback( List list, byte[] tableName, @@ -1615,6 +1912,7 @@ public class HConnectionManager { return location != null; } + @Override public void setRegionCachePrefetch(final byte[] tableName, final boolean enable) { if (!enable) { @@ -1625,6 +1923,7 @@ public class HConnectionManager { } } + @Override public boolean getRegionCachePrefetch(final byte[] tableName) { return !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName)); } @@ -1642,19 +1941,24 @@ public class HConnectionManager { @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException) { - try { - LOG.info("This client just lost it's session with ZooKeeper, trying" + - " to reconnect."); - resetZooKeeperTrackers(); - LOG.info("Reconnected successfully. This disconnect could have been" + - " caused by a network partition or a long-running GC pause," + - " either way it's recommended that you verify your environment."); - return; - } catch (ZooKeeperConnectionException e) { - LOG.error("Could not reconnect to ZooKeeper after session" + - " expiration, aborting"); - t = e; + if (t instanceof KeeperException.SessionExpiredException + && sharedZookeeper != null) { + synchronized (zooKeeperWatcherLock) { + if (sharedZookeeper != null) { + try { + LOG.info("This client just lost it's session with ZooKeeper, trying" + + " to reconnect."); + releaseSharedZooKeeperWatcher(getSharedZooKeeperWatcher()); + LOG.info("Reconnected successfully. This disconnect could have been" + + " caused by a network partition or a long-running GC pause," + + " either way it's recommended that you verify your environment."); + return; + } catch (IOException e) { + LOG.error("Could not reconnect to ZooKeeper after session" + + " expiration, aborting"); + t = e; + } + } } } if (t != null) LOG.fatal(msg, t); @@ -1673,14 +1977,18 @@ public class HConnectionManager { return this.aborted; } + @Override public int getCurrentNrHRS() throws IOException { + CloseableZooKeeperWatcher zkw = getSharedZooKeeperWatcher(); + try { // We go to zk rather than to master to get count of regions to avoid // HTable having a Master dependency. See HBase-2828 - return ZKUtil.getNumberOfChildren(this.zooKeeper, - this.zooKeeper.rsZNode); + return ZKUtil.getNumberOfChildren(zkw, zkw.rsZNode); } catch (KeeperException ke) { throw new IOException("Unexpected ZooKeeper exception", ke); + } finally { + zkw.close(); } } @@ -1713,77 +2021,79 @@ public class HConnectionManager { return refCount == 0; } - void close(boolean stopProxy) { + void close(boolean doStopProxy) { if (this.closed) { return; } - if (master != null) { - if (stopProxy) { - HBaseRPC.stopProxy(master); - } - master = null; - masterChecked = false; - } - if (stopProxy) { + delayedClosing.abort("Closing connection", null); + if (doStopProxy) { + closeSharedMaster(); for (HRegionInterface i : servers.values()) { HBaseRPC.stopProxy(i); } } + closeSharedZooKeeperWatcher(); this.servers.clear(); - if (this.zooKeeper != null) { - LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); - this.zooKeeper.close(); - this.zooKeeper = null; - } this.closed = true; } + @Override public void close() { if (managed) { HConnectionManager.deleteConnection((HConnection)this, stopProxy, false); } else { close(true); } - if (LOG.isTraceEnabled()) LOG.debug("" + this.zooKeeper + " closed."); } /** * Close the connection for good, regardless of what the current value of - * {@link #refCount} is. Ideally, {@link refCount} should be zero at this + * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this * point, which would be the case if all of its consumers close the * connection. However, on the off chance that someone is unable to close * the connection, perhaps because it bailed out prematurely, the method - * below will ensure that this {@link Connection} instance is cleaned up. + * below will ensure that this {@link HConnection} instance is cleaned up. * Caveat: The JVM may take an unknown amount of time to call finalize on an * unreachable object, so our hope is that every consumer cleans up after * itself, like any good citizen. */ @Override protected void finalize() throws Throwable { + super.finalize(); // Pretend as if we are about to release the last remaining reference refCount = 1; close(); - LOG.debug("The connection to " + this.zooKeeper - + " was closed by the finalize method."); } + @Override public HTableDescriptor[] listTables() throws IOException { - if (this.master == null) { - this.master = getMaster(); + CloseableMasterConnection master = getSharedMaster(); + try { + return master.getHTableDescriptors(); + } finally { + master.close(); } - HTableDescriptor[] htd = master.getHTableDescriptors(); - return htd; } + @Override public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException { - if (tableNames == null || tableNames.size() == 0) return null; - if (this.master == null) { - this.master = getMaster(); + if (tableNames == null || tableNames.isEmpty()) return null; + CloseableMasterConnection master = getSharedMaster(); + try { + return master.getHTableDescriptors(tableNames); + }finally { + master.close(); } - return master.getHTableDescriptors(tableNames); } + /** + * Connects to the master to get the table descriptor. + * @param tableName table name + * @return + * @throws IOException if the connection to master fails or if the table + * is not found. + */ + @Override public HTableDescriptor getHTableDescriptor(final byte[] tableName) throws IOException { if (tableName == null || tableName.length == 0) return null; @@ -1793,23 +2103,21 @@ public class HConnectionManager { if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { return HTableDescriptor.META_TABLEDESC; } - if (this.master == null) { - this.master = getMaster(); + CloseableMasterConnection master = getSharedMaster(); + HTableDescriptor[] htds; + try { + htds = master.getHTableDescriptors(); + }finally { + master.close(); } - HTableDescriptor hTableDescriptor = null; - HTableDescriptor[] htds = master.getHTableDescriptors(); if (htds != null && htds.length > 0) { for (HTableDescriptor htd: htds) { if (Bytes.equals(tableName, htd.getName())) { - hTableDescriptor = htd; + return htd; } } } - //HTableDescriptor htd = master.getHTableDescriptor(tableName); - if (hTableDescriptor == null) { - throw new TableNotFoundException(Bytes.toString(tableName)); - } - return hTableDescriptor; + throw new TableNotFoundException(Bytes.toString(tableName)); } } diff --git src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 61374d9..415b996 100644 --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -82,7 +82,7 @@ public class ReplicationAdmin implements Closeable { "enable it in order to use replication"); } this.connection = HConnectionManager.getConnection(conf); - ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); + ZooKeeperWatcher zkw = this.connection.getSharedZooKeeperWatcher(); try { this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); } catch (KeeperException e) { @@ -183,6 +183,8 @@ public class ReplicationAdmin implements Closeable { @Override public void close() throws IOException { if (this.connection != null) { + this.connection.releaseSharedZooKeeperWatcher( + replicationZk.getZookeeperWatcher() ); this.connection.close(); } } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index c37a4fb..332e69d 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -24,9 +24,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; @@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.zookeeper.KeeperException; @@ -111,16 +110,34 @@ public class VerifyReplication { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection conn) throws IOException { + ZooKeeperWatcher localZKW = null; + ReplicationZookeeper zk = null; + ReplicationPeer peer = null; try { - ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, - conn.getZooKeeperWatcher()); - ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); + localZKW = new ZooKeeperWatcher( + conf, "VerifyReplication", new Abortable() { + @Override public void abort(String why, Throwable e) {} + @Override public boolean isAborted() {return false;} + }); + zk = new ReplicationZookeeper(conn, conf, localZKW); + // Just verifying it we can connect + peer = zk.getPeer(peerId); HTable replicatedTable = new HTable(peer.getConfiguration(), conf.get(NAME+".tableName")); scan.setStartRow(value.getRow()); replicatedScanner = replicatedTable.getScanner(scan); } catch (KeeperException e) { throw new IOException("Got a ZK exception", e); + } finally { + if (peer != null) { + peer.close(); + } + if (zk != null) { + zk.close(); + } + if (localZKW != null) { + localZKW.close(); + } } return null; } @@ -160,11 +177,18 @@ public class VerifyReplication { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection conn) throws IOException { + ZooKeeperWatcher localZKW = null; + ReplicationZookeeper zk = null; + ReplicationPeer peer = null; try { - ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, - conn.getZooKeeperWatcher()); + localZKW = new ZooKeeperWatcher( + conf, "VerifyReplication", new Abortable() { + @Override public void abort(String why, Throwable e) {} + @Override public boolean isAborted() {return false;} + }); + zk = new ReplicationZookeeper(conn, conf, localZKW); // Just verifying it we can connect - ReplicationPeer peer = zk.getPeer(peerId); + peer = zk.getPeer(peerId); if (peer == null) { throw new IOException("Couldn't get access to the slave cluster," + "please see the log"); @@ -172,6 +196,16 @@ public class VerifyReplication { } catch (KeeperException ex) { throw new IOException("Couldn't get access to the slave cluster" + " because: ", ex); + } finally { + if (peer != null){ + peer.close(); + } + if (zk != null){ + zk.close(); + } + if (localZKW != null){ + localZKW.close(); + } } return null; } diff --git src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 6495207..a58ed1e 100644 --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; * communicate with remote peers and is responsible for answering to expired * sessions and re-establishing the ZK connections. */ -public class ReplicationPeer implements Abortable { +public class ReplicationPeer implements Abortable, Closeable { private static final Log LOG = LogFactory.getLog(ReplicationPeer.class); private final String clusterKey; @@ -133,7 +134,7 @@ public class ReplicationPeer implements Abortable { public void reloadZkWatcher() throws IOException { if (zkw != null) zkw.close(); zkw = new ZooKeeperWatcher(conf, - "connection to cluster: " + id, this); + "connection to cluster: " + id, this); } @Override @@ -142,4 +143,11 @@ public class ReplicationPeer implements Abortable { // abort method is called. return false; } + + @Override + public void close() throws IOException { + if (zkw != null){ + zkw.close(); + } + } } diff --git src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 8c9cb9b..38dfbaf 100644 --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -74,7 +75,7 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; * ... * */ -public class ReplicationZookeeper { +public class ReplicationZookeeper implements Closeable{ private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); // Name of znode we use to lock when failover @@ -744,6 +745,12 @@ public class ReplicationZookeeper { return peersZNode; } + @Override + public void close() throws IOException { + if (statusTracker != null) + statusTracker.stop(); + } + /** * Tracker for status of the replication */ diff --git src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index e55b906..2000ea5 100644 --- src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -133,10 +134,9 @@ public class HBaseFsck { executor.allowCoreThreadTimeOut(true); } - public void connect() throws MasterNotRunningException, - ZooKeeperConnectionException { + public void connect() throws IOException { admin = new HBaseAdmin(conf); - status = admin.getMaster().getClusterStatus(); + status = admin.getClusterStatus(); connection = admin.getConnection(); } @@ -520,13 +520,15 @@ public class HBaseFsck { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection connection) throws IOException { - ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + ZooKeeperWatcher zkw = connection.getSharedZooKeeperWatcher(); try { for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { disabledTables.add(Bytes.toBytes(tableName)); } } catch (KeeperException ke) { throw new IOException(ke); + } finally { + connection.releaseSharedZooKeeperWatcher(zkw); } return null; } @@ -626,7 +628,7 @@ public class HBaseFsck { private ServerName getRootRegionServerName() throws IOException, InterruptedException { RootRegionTracker rootRegionTracker = - new RootRegionTracker(this.connection.getZooKeeperWatcher(), new Abortable() { + new RootRegionTracker(this.connection.getSharedZooKeeperWatcher(), new Abortable() { @Override public void abort(String why, Throwable e) { LOG.error(why, e); diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java index 48a8b3d..2801a88 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java @@ -88,7 +88,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { * @return Returns null if data is null else converts passed data * to a ServerName instance. */ - private static ServerName dataToServerName(final byte [] data) { + public static ServerName dataToServerName(final byte [] data) { // The str returned could be old style -- pre hbase-1502 -- which was // hostname and port seperated by a colon rather than hostname, port and // startcode delimited by a ','. diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 719a176..195e3cc 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -33,9 +33,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.EmptyWatcher; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -249,9 +247,6 @@ public class ZKUtil { /** * Check if the specified node exists. Sets no watches. * - * Returns true if node exists, false if not. Returns an exception if there - * is an unexpected zookeeper exception. - * * @param zkw zk reference * @param znode path of node to watch * @return version of the node if it exists, -1 if does not exist @@ -1168,4 +1163,21 @@ public class ZKUtil { throw new IOException(keeperEx); } } + + + public static byte[] blockUntilAvailable( + final ZooKeeperWatcher zkw, final String znode, final long timeout) + throws InterruptedException { + if (timeout < 0) throw new IllegalArgumentException(); + if (zkw == null) throw new IllegalArgumentException(); + if (znode == null) throw new IllegalArgumentException(); + + ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new Abortable() { + @Override public void abort(String why, Throwable e) {} + @Override public boolean isAborted() {return false;} + }) { + }; + + return znt.blockUntilAvailable(timeout, true); + } } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java index c6e607e..d20c07f 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java @@ -108,6 +108,10 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { * Gets the data of the node, blocking until the node is available or the * specified timeout has elapsed. * + * todo during a code review: if the node does not exist, the watcher won't + * be set (see javadoc comment for getDataAndWatch), hence we will just + * wait for the end of the timeout but won't get the data. + * * @param timeout maximum time to wait for the node data to be available, * n milliseconds. Pass 0 for no timeout. * @return data of the node @@ -211,7 +215,8 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { } catch (KeeperException e) { abortable .abort( - "Exception while checking if basenode exists.", + "Exception while checking if basenode ("+watcher.baseZNode+ + ") exists in ZooKeeper.", e); } return true; diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 45d7d6a..d41faca 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.zookeeper; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -65,7 +66,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { private RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure - private Abortable abortable; + protected Abortable abortable; // listeners to be notified private final List listeners = @@ -434,6 +435,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { /** * Close the connection to ZooKeeper. + * * @throws InterruptedException */ public void close() { @@ -443,9 +445,29 @@ public class ZooKeeperWatcher implements Watcher, Abortable { // super.close(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } + /* + public void close() { + if (refCount>0){ + --refCount; + } + + if (refCount == 0) { + try { + if (recoverableZooKeeper != null) { + recoverableZooKeeper.close(); +// super.close(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + */ + public Configuration getConfiguration() { return conf; } @@ -459,4 +481,14 @@ public class ZooKeeperWatcher implements Watcher, Abortable { public boolean isAborted() { return this.abortable.isAborted(); } + + + private int refCount = 0; + public void incRefCount() { + ++refCount; + } + + public void decRefCount() { + --refCount; + } } diff --git src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index d267824..81da694 100644 --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -100,7 +100,7 @@ public class TestZooKeeper { String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); - ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); + ZooKeeperWatcher connectionZK = connection.getSharedZooKeeperWatcher(); long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, @@ -123,7 +123,7 @@ public class TestZooKeeper { equals(States.CLOSED)); // Check that the client recovered - ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); + ZooKeeperWatcher newConnectionZK = connection.getSharedZooKeeperWatcher(); LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( States.CONNECTED)); @@ -182,11 +182,11 @@ public class TestZooKeeper { ipMeta.exists(new Get(HConstants.LAST_ROW)); // make sure they aren't the same - assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getZooKeeperWatcher() - == HConnectionManager.getConnection(otherConf).getZooKeeperWatcher()); + assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getSharedZooKeeperWatcher() + == HConnectionManager.getConnection(otherConf).getSharedZooKeeperWatcher()); assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()) - .getZooKeeperWatcher().getQuorum().equals(HConnectionManager - .getConnection(otherConf).getZooKeeperWatcher().getQuorum())); + .getSharedZooKeeperWatcher().getQuorum().equals(HConnectionManager + .getConnection(otherConf).getSharedZooKeeperWatcher().getQuorum())); localMeta.close(); ipMeta.close(); } catch (Exception e) { diff --git src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java index 327950c..33b5adf 100644 --- src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java +++ src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java @@ -749,6 +749,7 @@ public class TestAdmin { admin.createTable(desc, splitKeys); HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); Map regions = ht.getRegionsInfo(); + ht.close(); assertEquals("Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); // Disable table. diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 1997abd..f8fb428 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -131,13 +131,13 @@ public class TestSplitTransactionOnCluster { List daughters = cluster.getRegions(tableName); assertTrue(daughters.size() >= 2); // Assert the ephemeral node is up in zk. - String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), + String path = ZKAssign.getNodeName(t.getConnection().getSharedZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + t.getConnection().getSharedZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = - ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), + ZKAssign.getData(t.getConnection().getSharedZooKeeperWatcher(), hri.getEncodedName()); // State could be SPLIT or SPLITTING. assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || @@ -157,7 +157,7 @@ public class TestSplitTransactionOnCluster { assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + stats = t.getConnection().getSharedZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally { @@ -194,7 +194,7 @@ public class TestSplitTransactionOnCluster { int regionCount = server.getOnlineRegions().size(); // Insert into zk a blocking znode, a znode of same name as region // so it gets in way of our splitting. - ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(), + ZKAssign.createNodeClosing(t.getConnection().getSharedZooKeeperWatcher(), hri, new ServerName("any.old.server", 1234, -1)); // Now try splitting.... should fail. And each should successfully // rollback. @@ -207,7 +207,7 @@ public class TestSplitTransactionOnCluster { assertEquals(regionCount, server.getOnlineRegions().size()); } // Now clear the zknode - ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri); + ZKAssign.deleteClosingNode(t.getConnection().getSharedZooKeeperWatcher(), hri); // Now try splitting and it should work. split(hri, server, regionCount); // Get daughters diff --git src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index 7b7a670..5610167 100644 --- src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -77,8 +77,6 @@ public class TestRemoteTable { TEST_UTIL.startMiniCluster(); REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration()); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - LOG.info("Admin Connection=" + admin.getConnection() + ", " + - admin.getConnection().getZooKeeperWatcher()); if (!admin.tableExists(TABLE)) { HTableDescriptor htd = new HTableDescriptor(TABLE); htd.addFamily(new HColumnDescriptor(COLUMN_1)); @@ -86,8 +84,6 @@ public class TestRemoteTable { htd.addFamily(new HColumnDescriptor(COLUMN_3)); admin.createTable(htd); HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE); - LOG.info("Table connection=" + table.getConnection() + ", " + - admin.getConnection().getZooKeeperWatcher()); Put put = new Put(ROW_1); put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1); table.put(put);