diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 160cba7..bdf5791 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -193,7 +193,7 @@ public class ClientScanner extends AbstractClientScanner { done) { close(); if (LOG.isDebugEnabled()) { - LOG.debug("Finished with scanning at " + this.currentRegion); + LOG.debug("Finished scanning region " + this.currentRegion); } return false; } @@ -301,8 +301,9 @@ public class ClientScanner extends AbstractClientScanner { } else { Throwable cause = e.getCause(); if ((cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) - && !(e instanceof OutOfOrderScannerNextException)) { + && !(cause instanceof RegionServerStoppedException))) && + !(e instanceof RegionServerStoppedException) && + !(e instanceof OutOfOrderScannerNextException)) { throw e; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index e231967..923cb57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -333,12 +333,10 @@ public interface HConnection extends Abortable, Closeable { public boolean getRegionCachePrefetch(final byte[] tableName); /** - * Scan zookeeper to get the number of region servers * @return the number of region servers that are currently running * @throws IOException if a remote or network exception occurs * @deprecated This method will be changed from public to package protected. */ - @Deprecated public int getCurrentNrHRS() throws IOException; /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index d08ab95..763b79f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.UndeclaredThrowableException; import java.net.SocketException; import java.util.ArrayList; @@ -144,9 +145,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; -import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; @@ -251,17 +249,18 @@ public class HConnectionManager { * @return HConnection object for conf * @throws ZooKeeperConnectionException */ + @SuppressWarnings("resource") public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { - connection = new HConnectionImplementation(conf, true); + connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); - connection = new HConnectionImplementation(conf, true); + connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); @@ -280,7 +279,28 @@ public class HConnectionManager { */ public static HConnection createConnection(Configuration conf) throws IOException { - return new HConnectionImplementation(conf, false); + return createConnection(conf, false); + } + + static HConnection createConnection(final Configuration conf, final boolean managed) + throws IOException { + String className = conf.get("hbase.client.connection.impl", + HConnectionManager.HConnectionImplementation.class.getName()); + Class clazz = null; + try { + clazz = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + try { + // Default HCM#HCI is not accessible; make it so before invoking. + Constructor constructor = + clazz.getDeclaredConstructor(Configuration.class, boolean.class); + constructor.setAccessible(true); + return (HConnection) constructor.newInstance(conf, managed); + } catch (Exception e) { + throw new IOException(e); + } } /** @@ -418,7 +438,7 @@ public class HConnectionManager { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final long pause; private final int numTries; - private final int rpcTimeout; + final int rpcTimeout; private final int prefetchRegionLimit; private final boolean useServerTrackerForRetries; private final long serverTrackerTimeout; @@ -473,6 +493,11 @@ public class HConnectionManager { private final boolean managed; /** + * Cluster registry of basic info such as clusterid and meta region location. + */ + final Registry registry; + + /** * constructor * @param conf Configuration object * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection @@ -512,6 +537,7 @@ public class HConnectionManager { } } this.serverTrackerTimeout = serverTrackerTimeout; + this.registry = setupRegistry(); retrieveClusterId(); this.rpcClient = new RpcClient(this.conf, this.clusterId); @@ -534,6 +560,23 @@ public class HConnectionManager { }, conf, listenerClass); } } + + /** + * @return The cluster registry implementation to use. + * @throws IOException + */ + private Registry setupRegistry() throws IOException { + String registryClass = this.conf.get("hbase.client.registry.impl", + ZooKeeperRegistry.class.getName()); + Registry registry = null; + try { + registry = (Registry)Class.forName(registryClass).newInstance(); + } catch (Throwable t) { + throw new IOException(t); + } + registry.init(this); + return registry; + } /** * For tests only. @@ -554,31 +597,11 @@ public class HConnectionManager { return "hconnection-0x" + Integer.toHexString(hashCode()); } - private String clusterId = null; + protected String clusterId = null; - public final void retrieveClusterId(){ - if (clusterId != null) { - return; - } - - // No synchronized here, worse case we will retrieve it twice, that's - // not an issue. - ZooKeeperKeepAliveConnection zkw = null; - try { - zkw = getKeepAliveZooKeeperWatcher(); - clusterId = ZKClusterId.readClusterIdZNode(zkw); - if (clusterId == null) { - LOG.info("ClusterId read in ZooKeeper is null"); - } - } catch (KeeperException e) { - LOG.warn("Can't retrieve clusterId from Zookeeper", e); - } catch (IOException e) { - LOG.warn("Can't retrieve clusterId from Zookeeper", e); - } finally { - if (zkw != null) { - zkw.close(); - } - } + void retrieveClusterId() { + if (clusterId != null) return; + this.clusterId = this.registry.getClusterId(); if (clusterId == null) { clusterId = HConstants.CLUSTER_ID_DEFAULT; } @@ -639,12 +662,12 @@ public class HConnectionManager { @Override public boolean isTableEnabled(byte[] tableName) throws IOException { - return testTableOnlineState(tableName, true); + return this.registry.isTableOnlineState(tableName, true); } @Override public boolean isTableDisabled(byte[] tableName) throws IOException { - return testTableOnlineState(tableName, false); + return this.registry.isTableOnlineState(tableName, false); } @Override @@ -716,26 +739,6 @@ public class HConnectionManager { return available.get() && (regionCount.get() == splitKeys.length + 1); } - /* - * @param enabled True if table is enabled - */ - private boolean testTableOnlineState(byte [] tableName, boolean enabled) - throws IOException { - String tableNameStr = Bytes.toString(tableName); - ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher(); - try { - if (enabled) { - return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr); - } - return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr); - } catch (KeeperException e) { - throw new IOException("Enable/Disable failed", e); - }finally { - zkw.close(); - } - } - - @Override public HRegionLocation locateRegion(final byte[] regionName) throws IOException { return locateRegion(HRegionInfo.getTableName(regionName), @@ -801,26 +804,7 @@ public class HConnectionManager { } if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { - ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher(); - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Looking up meta region location in ZK," + " connection=" + this); - } - ServerName servername = - MetaRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout); - - if (LOG.isTraceEnabled()) { - LOG.debug("Looked up meta region location, connection=" + this + - "; serverName=" + ((servername == null) ? "null" : servername)); - } - if (servername == null) return null; - return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - zkw.close(); - } + return this.registry.getMetaRegionLocation(); } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, @@ -1532,7 +1516,7 @@ public class HConnectionManager { stub = (ClientService.BlockingInterface)this.stubs.get(key); if (stub == null) { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, - User.getCurrent(), this.rpcTimeout); + User.getCurrent(), this.rpcTimeout); stub = ClientService.newBlockingStub(channel); // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. // Just fail on first actual call rather than in here on setup. @@ -1557,7 +1541,7 @@ public class HConnectionManager { * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it. * @return The shared instance. Never returns null. */ - public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher() + ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher() throws IOException { synchronized (masterAndZKLock) { if (keepAliveZookeeper == null) { @@ -1566,8 +1550,7 @@ public class HConnectionManager { } // We don't check that our link to ZooKeeper is still valid // But there is a retry mechanism in the ZooKeeperWatcher itself - keepAliveZookeeper = new ZooKeeperKeepAliveConnection( - conf, this.toString(), this); + keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this); } keepAliveZookeeperUserCount++; keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; @@ -1575,20 +1558,18 @@ public class HConnectionManager { } } - void releaseZooKeeperWatcher(ZooKeeperWatcher zkw) { + void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) { if (zkw == null){ return; } synchronized (masterAndZKLock) { --keepAliveZookeeperUserCount; - if (keepAliveZookeeperUserCount <=0 ){ - keepZooKeeperWatcherAliveUntil = - System.currentTimeMillis() + keepAlive; + if (keepAliveZookeeperUserCount <= 0 ){ + keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive; } } } - /** * Creates a Chore thread to check the connections to master & zookeeper * and close them when they reach their closing time ( @@ -2581,17 +2562,7 @@ public class HConnectionManager { @Override public int getCurrentNrHRS() throws IOException { - ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher(); - - try { - // We go to zk rather than to master to get count of regions to avoid - // HTable having a Master dependency. See HBase-2828 - return ZKUtil.getNumberOfChildren(zkw, zkw.rsZNode); - } catch (KeeperException ke) { - throw new IOException("Unexpected ZooKeeper exception", ke); - } finally { - zkw.close(); - } + return this.registry.getCurrentNrHRS(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java index 578959d..12476ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java @@ -37,9 +37,8 @@ import java.util.NavigableMap; import java.util.TreeMap; /** - * Scanner class that contains the .META. table scanning logic - * and uses a Retryable scanner. Provided visitors will be called - * for each row. + * Scanner class that contains the .META. table scanning logic. + * Provided visitors will be called for each row. * * Although public visibility, this is not a public-facing API and may evolve in * minor releases. @@ -123,114 +122,58 @@ public class MetaScanner { public static void metaScan(Configuration configuration, final MetaScannerVisitor visitor, final byte[] tableName, final byte[] row, final int rowLimit, final byte[] metaTableName) - throws IOException { - try { - HConnectionManager.execute(new HConnectable(configuration) { - @Override - public Void connect(HConnection connection) throws IOException { - metaScan(conf, connection, visitor, tableName, row, rowLimit, - metaTableName); - return null; - } - }); - } finally { - visitor.close(); - } - } - - private static void metaScan(Configuration configuration, HConnection connection, - MetaScannerVisitor visitor, byte [] tableName, byte[] row, - int rowLimit, final byte [] metaTableName) throws IOException { int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE; - - // if row is not null, we want to use the startKey of the row's region as - // the startRow for the meta scan. + HTable metaTable = new HTable(configuration, HConstants.META_TABLE_NAME); + // Calculate startrow for scan. byte[] startRow; - if (row != null) { - // Scan starting at a particular row in a particular table - assert tableName != null; - byte[] searchRow = - HRegionInfo.createRegionName(tableName, row, HConstants.NINES, - false); - HTable metaTable = null; - try { - metaTable = new HTable(configuration, HConstants.META_TABLE_NAME); - Result startRowResult = metaTable.getRowOrBefore(searchRow, - HConstants.CATALOG_FAMILY); + try { + if (row != null) { + // Scan starting at a particular row in a particular table + byte[] searchRow = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); + Result startRowResult = metaTable.getRowOrBefore(searchRow, HConstants.CATALOG_FAMILY); if (startRowResult == null) { - throw new TableNotFoundException("Cannot find row in .META. for table: " - + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); + throw new TableNotFoundException("Cannot find row in .META. for table: " + + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); } HRegionInfo regionInfo = getHRegionInfo(startRowResult); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in Meta for " + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); } - byte[] rowBefore = regionInfo.getStartKey(); - startRow = HRegionInfo.createRegionName(tableName, rowBefore, - HConstants.ZEROES, false); - } finally { - if (metaTable != null) { - metaTable.close(); - } + startRow = HRegionInfo.createRegionName(tableName, rowBefore, HConstants.ZEROES, false); + } else if (tableName == null || tableName.length == 0) { + // Full META scan + startRow = HConstants.EMPTY_START_ROW; + } else { + // Scan META for an entire table + startRow = HRegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, + HConstants.ZEROES, false); } - } else if (tableName == null || tableName.length == 0) { - // Full META scan - startRow = HConstants.EMPTY_START_ROW; - } else { - // Scan META for an entire table - startRow = HRegionInfo.createRegionName( - tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false); - } - - // Scan over each meta region - ScannerCallable callable; - int rows = Math.min(rowLimit, configuration.getInt( - HConstants.HBASE_META_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); - do { final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); + int rows = Math.min(rowLimit, configuration.getInt(HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); + scan.setCaching(rows); if (LOG.isDebugEnabled()) { - LOG.debug("Scanning " + Bytes.toString(metaTableName) + - " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" + - rowUpperLimit + " rows using " + connection.toString()); + LOG.debug("Scanning " + Bytes.toString(metaTableName) + " starting at row=" + + Bytes.toStringBinary(startRow) + " for max=" + rowUpperLimit + " with caching=" + rows); } - callable = new ScannerCallable(connection, metaTableName, scan, null); - // Open scanner - callable.withRetries(); - + // Run the scan + ResultScanner scanner = metaTable.getScanner(scan); + Result result = null; int processedRows = 0; - try { - callable.setCaching(rows); - done: do { - if (processedRows >= rowUpperLimit) { - break; - } - //we have all the rows here - Result [] rrs = callable.withRetries(); - if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) { - break; //exit completely - } - for (Result rr : rrs) { - if (processedRows >= rowUpperLimit) { - break done; - } - if (!visitor.processRow(rr)) - break done; //exit completely - processedRows++; - } - //here, we didn't break anywhere. Check if we have more rows - } while(true); - // Advance the startRow to the end key of the current region - startRow = callable.getHRegionInfo().getEndKey(); - } finally { - // Close scanner - callable.setClose(); - callable.withRetries(); + while ((result = scanner.next()) != null) { + if (visitor != null) { + if (!visitor.processRow(result)) break; + } + processedRows++; + if (processedRows >= rowUpperLimit) break; } - } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0); + } finally { + if (visitor != null) visitor.close(); + if (metaTable != null) metaTable.close(); + } } /** @@ -399,4 +342,4 @@ public class MetaScanner { return super.processRow(rowResult); } } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java new file mode 100644 index 0000000..a48c571 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Cluster registry. + * Implemenations hold cluster information such as this cluster's id, location of .META., etc. + */ +interface Registry { + /** + * @param connection + */ + void init(HConnection connection); + + /** + * @return Meta region location + * @throws IOException + */ + HRegionLocation getMetaRegionLocation() throws IOException; + + /** + * @return Cluster id. + */ + String getClusterId(); + + /** + * @param enabled Return true if table is enabled + * @throws IOException + */ + boolean isTableOnlineState(byte [] tableName, boolean enabled) throws IOException; + + /** + * @return Count of 'running' regionservers + * @throws IOException + */ + int getCurrentNrHRS() throws IOException; +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index c1d40fb..bde1588 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -201,12 +201,7 @@ public class ScannerCallable extends ServerCallable { if (this.scanMetrics != null) { this.scanMetrics.countOfNSRE.incrementAndGet(); } - throw new DoNotRetryIOException("Reset scanner", ioe); - } else if (ioe instanceof RegionServerStoppedException) { - // Throw a DNRE so that we break out of cycle of calling RSSE - // when what we need is to open scanner against new location. - // Attach RSSE to signal client that it needs to resetup scanner. - throw new DoNotRetryIOException("Reset scanner", ioe); + throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); } else { // The outer layers will retry throw ioe; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java index 1b2e54a..a04b1c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java @@ -169,8 +169,7 @@ public abstract class ServerCallable implements Callable { prepare(tries != 0); // if called with false, check table status on ZK return call(); } catch (Throwable t) { - LOG.warn("Received exception, tries=" + tries + ", numRetries=" + numRetries + ":" + - t.getMessage()); + LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ": " + t); t = translateException(t); // translateException throws an exception when we should not retry, i.e. when it's the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java new file mode 100644 index 0000000..933f129 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; +import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; + +/** + * A cluster registry that stores to zookeeper. + */ +class ZooKeeperRegistry implements Registry { + static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class); + // Needs an instance of hci to function. Set after construct this instance. + HConnectionManager.HConnectionImplementation hci; + + @Override + public void init(HConnection connection) { + if (!(connection instanceof HConnectionManager.HConnectionImplementation)) { + throw new RuntimeException("This registry depends on HConnectionImplementation"); + } + this.hci = (HConnectionManager.HConnectionImplementation)connection; + } + + @Override + public HRegionLocation getMetaRegionLocation() throws IOException { + ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); + + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Looking up meta region location in ZK," + " connection=" + this); + } + ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout); + if (LOG.isTraceEnabled()) { + LOG.debug("Looked up meta region location, connection=" + this + + "; serverName=" + ((servername == null) ? "null" : servername)); + } + if (servername == null) return null; + return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } finally { + zkw.close(); + } + } + + @Override + public String getClusterId() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isTableOnlineState(byte [] tableName, boolean enabled) + throws IOException { + String tableNameStr = Bytes.toString(tableName); + ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); + try { + if (enabled) { + return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr); + } + return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr); + } catch (KeeperException e) { + throw new IOException("Enable/Disable failed", e); + } finally { + zkw.close(); + } + } + + @Override + public int getCurrentNrHRS() throws IOException { + ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); + try { + // We go to zk rather than to master to get count of regions to avoid + // HTable having a Master dependency. See HBase-2828 + return ZKUtil.getNumberOfChildren(zkw, zkw.rsZNode); + } catch (KeeperException ke) { + throw new IOException("Unexpected ZooKeeper exception", ke); + } finally { + zkw.close(); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionServerStoppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionServerStoppedException.java index 5ab1aed..4560a31 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionServerStoppedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionServerStoppedException.java @@ -20,16 +20,14 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.classification.InterfaceAudience; -import java.io.IOException; - /** * Thrown by the region server when it is in shutting down state. */ @SuppressWarnings("serial") @InterfaceAudience.Private -public class RegionServerStoppedException extends IOException { +public class RegionServerStoppedException extends DoNotRetryIOException { public RegionServerStoppedException(String s) { super(s); } -} +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java new file mode 100644 index 0000000..1ed1d96 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Test client behavior w/o setting up a cluster. + * Mock up cluster emissions. + */ +public class TestClientNoCluster { + private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class); + private Configuration conf; + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(); + // Run my HConnection overrides. Use my little HConnectionImplementation below which + // allows me insert mocks and also use my Registry below rather than the default zk based + // one so tests run faster and don't have zk dependency. + this.conf.set("hbase.client.connection.impl", NoClusterConnection.class.getName()); + this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); + } + + /** + * Simple cluster registry inserted in place of our usual zookeeper based one. + */ + static class SimpleRegistry implements Registry { + final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345); + + @Override + public void init(HConnection connection) { + } + + @Override + public HRegionLocation getMetaRegionLocation() throws IOException { + return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST); + } + + @Override + public String getClusterId() { + return HConstants.CLUSTER_ID_DEFAULT; + } + + @Override + public boolean isTableOnlineState(byte[] tableName, boolean enabled) + throws IOException { + return enabled; + } + + @Override + public int getCurrentNrHRS() throws IOException { + return 1; + } + } + + @Test + public void testDoNotRetryMetaScanner() throws IOException { + MetaScanner.metaScan(this.conf, null); + } + + @Test + public void testDoNotRetryOnScan() throws IOException { + // Go against meta else we will try to find first region for the table on construction which + // means we'll have to do a bunch more mocking. Tests that go against meta only should be + // good for a bit of testing. + HTable table = new HTable(this.conf, HConstants.META_TABLE_NAME); + ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); + try { + Result result = null; + while ((result = scanner.next()) != null) { + LOG.info(result); + } + } finally { + scanner.close(); + table.close(); + } + } + + /** + * Override to shutdown going to zookeeper for cluster id and meta location. + */ + static class NoClusterConnection extends HConnectionManager.HConnectionImplementation { + final ClientService.BlockingInterface stub; + + NoClusterConnection(Configuration conf, boolean managed) throws IOException { + super(conf, managed); + // Mock up my stub so open scanner returns a scanner id and then on next, we throw + // exceptions for three times and then after that, we return no more to scan. + this.stub = Mockito.mock(ClientService.BlockingInterface.class); + long sid = 12345L; + try { + Mockito.when(stub.scan((RpcController)Mockito.any(), + (ClientProtos.ScanRequest)Mockito.any())). + thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()). + thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))). + thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid). + setMoreResults(false).build()); + } catch (ServiceException e) { + throw new IOException(e); + } + } + + @Override + public BlockingInterface getClient(ServerName sn) throws IOException { + return this.stub; + } + } +} \ No newline at end of file diff --git a/hbase-client/src/test/resources/hbase-site.xml b/hbase-client/src/test/resources/hbase-site.xml new file mode 100644 index 0000000..ffeb0ef --- /dev/null +++ b/hbase-client/src/test/resources/hbase-site.xml @@ -0,0 +1,37 @@ + + + + + + hbase.defaults.for.version.skip + true + + + hbase.client.retries.number + 5 + Maximum retries. Used as maximum for all retryable + operations such as fetching of the root region from root region + server, getting a cell's value, starting a row update, etc. + Default: 10. + + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 22d539a..605bc6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -786,7 +786,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } // for } catch (Throwable t) { if (!checkOOME(t)) { - abort("Unhandled exception: " + t.getMessage(), t); + String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; + abort(prefix + t.getMessage(), t); } } // Run shutdown. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 4163a53..69ba2aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -108,12 +108,10 @@ public class TestZooKeeper { } - private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws - NoSuchMethodException, InvocationTargetException, IllegalAccessException { - - Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher"); + private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method getterZK = c.getClass().getDeclaredMethod("getKeepAliveZooKeeperWatcher"); getterZK.setAccessible(true); - return (ZooKeeperWatcher) getterZK.invoke(c); } @@ -196,12 +194,12 @@ public class TestZooKeeper { connection.close(); } - @Test + @Test (timeout = 60000) public void testRegionServerSessionExpired() throws Exception { LOG.info("Starting testRegionServerSessionExpired"); int metaIndex = TEST_UTIL.getMiniHBaseCluster().getServerWithMeta(); TEST_UTIL.expireRegionServerSession(metaIndex); - testSanity(); + testSanity("testRegionServerSessionExpired"); } // @Test Disabled because seems to make no sense expiring master session @@ -210,7 +208,7 @@ public class TestZooKeeper { public void testMasterSessionExpired() throws Exception { LOG.info("Starting testMasterSessionExpired"); TEST_UTIL.expireMasterSession(); - testSanity(); + testSanity("testMasterSessionExpired"); } /** @@ -220,27 +218,31 @@ public class TestZooKeeper { */ @Test(timeout = 60000) public void testMasterZKSessionRecoveryFailure() throws Exception { + LOG.info("Starting testMasterZKSessionRecoveryFailure"); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException()); assertFalse(m.isStopped()); - testSanity(); + testSanity("testMasterZKSessionRecoveryFailure"); } /** * Make sure we can use the cluster * @throws Exception */ - private void testSanity() throws Exception{ - HBaseAdmin admin = - new HBaseAdmin(TEST_UTIL.getConfiguration()); - String tableName = "test"+System.currentTimeMillis(); + private void testSanity(final String testName) throws Exception{ + String tableName = testName + "." + System.currentTimeMillis(); HTableDescriptor desc = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor("fam"); desc.addFamily(family); LOG.info("Creating table " + tableName); - admin.createTable(desc); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + try { + admin.createTable(desc); + } finally { + admin.close(); + } HTable table = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); @@ -253,33 +255,29 @@ public class TestZooKeeper { } @Test - public void testMultipleZK() { - try { - HTable localMeta = - new HTable(new Configuration(TEST_UTIL.getConfiguration()), HConstants.META_TABLE_NAME); - Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); - otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); - HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME); - - // dummy, just to open the connection - final byte [] row = new byte [] {'r'}; - localMeta.exists(new Get(row)); - ipMeta.exists(new Get(row)); - - // make sure they aren't the same - ZooKeeperWatcher z1 = - getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration())); - ZooKeeperWatcher z2 = - getZooKeeperWatcher(HConnectionManager.getConnection(otherConf)); - assertFalse(z1 == z2); - assertFalse(z1.getQuorum().equals(z2.getQuorum())); - - localMeta.close(); - ipMeta.close(); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } + public void testMultipleZK() + throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + HTable localMeta = + new HTable(new Configuration(TEST_UTIL.getConfiguration()), HConstants.META_TABLE_NAME); + Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); + otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); + HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME); + + // dummy, just to open the connection + final byte [] row = new byte [] {'r'}; + localMeta.exists(new Get(row)); + ipMeta.exists(new Get(row)); + + // make sure they aren't the same + ZooKeeperWatcher z1 = + getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration())); + ZooKeeperWatcher z2 = + getZooKeeperWatcher(HConnectionManager.getConnection(otherConf)); + assertFalse(z1 == z2); + assertFalse(z1.getQuorum().equals(z2.getQuorum())); + + localMeta.close(); + ipMeta.close(); } /** @@ -446,8 +444,11 @@ public class TestZooKeeper { // Restore the ACL ZooKeeper zk3 = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance); zk3.addAuthInfo("digest", "hbase:rox".getBytes()); - zk3.setACL("/", oldACL, -1); - zk3.close(); + try { + zk3.setACL("/", oldACL, -1); + } finally { + zk3.close(); + } } /** @@ -475,21 +476,26 @@ public class TestZooKeeper { HMaster m = cluster.getMaster(); // now the cluster is up. So assign some regions. HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), + try { + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j") }; - String tableName = "testRegionAssignmentAfterMasterRecoveryDueToZKExpiry"; - admin.createTable(new HTableDescriptor(tableName), SPLIT_KEYS); - ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL); - ZKAssign.blockUntilNoRIT(zooKeeperWatcher); - m.getZooKeeperWatcher().close(); - MockLoadBalancer.retainAssignCalled = false; - m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException()); - assertFalse(m.isStopped()); - // The recovered master should not call retainAssignment, as it is not a - // clean startup. - assertFalse("Retain assignment should not be called", MockLoadBalancer.retainAssignCalled); + String tableName = "testRegionAssignmentAfterMasterRecoveryDueToZKExpiry"; + admin.createTable(new HTableDescriptor(tableName), SPLIT_KEYS); + ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL); + ZKAssign.blockUntilNoRIT(zooKeeperWatcher); + m.getZooKeeperWatcher().close(); + MockLoadBalancer.retainAssignCalled = false; + m.abort("Test recovery from zk session expired", + new KeeperException.SessionExpiredException()); + assertFalse(m.isStopped()); + // The recovered master should not call retainAssignment, as it is not a + // clean startup. + assertFalse("Retain assignment should not be called", MockLoadBalancer.retainAssignCalled); + } finally { + admin.close(); + } } /** @@ -504,40 +510,47 @@ public class TestZooKeeper { HMaster m = cluster.getMaster(); // now the cluster is up. So assign some regions. HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"), + HTable table = null; + try { + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"), Bytes.toBytes("3"), Bytes.toBytes("4"), Bytes.toBytes("5") }; - String tableName = "testLogSplittingAfterMasterRecoveryDueToZKExpiry"; - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor("col"); - htd.addFamily(hcd); - admin.createTable(htd, SPLIT_KEYS); - ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL); - ZKAssign.blockUntilNoRIT(zooKeeperWatcher); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); - - Put p; - int numberOfPuts; - for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) { - p = new Put(Bytes.toBytes(numberOfPuts)); - p.add(Bytes.toBytes("col"), Bytes.toBytes("ql"), Bytes.toBytes("value" + numberOfPuts)); - table.put(p); - } - m.getZooKeeperWatcher().close(); - m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException()); - assertFalse(m.isStopped()); - cluster.getRegionServer(0).abort("Aborting"); - // Without patch for HBASE-6046 this test case will always timeout - // with patch the test case should pass. - Scan scan = new Scan(); - int numberOfRows = 0; - ResultScanner scanner = table.getScanner(scan); - Result[] result = scanner.next(1); - while (result != null && result.length > 0) { - numberOfRows++; - result = scanner.next(1); + String tableName = "testLogSplittingAfterMasterRecoveryDueToZKExpiry"; + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("col"); + htd.addFamily(hcd); + admin.createTable(htd, SPLIT_KEYS); + ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL); + ZKAssign.blockUntilNoRIT(zooKeeperWatcher); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put p; + int numberOfPuts; + for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) { + p = new Put(Bytes.toBytes(numberOfPuts)); + p.add(Bytes.toBytes("col"), Bytes.toBytes("ql"), Bytes.toBytes("value" + numberOfPuts)); + table.put(p); + } + m.getZooKeeperWatcher().close(); + m.abort("Test recovery from zk session expired", + new KeeperException.SessionExpiredException()); + assertFalse(m.isStopped()); + cluster.getRegionServer(0).abort("Aborting"); + // Without patch for HBASE-6046 this test case will always timeout + // with patch the test case should pass. + Scan scan = new Scan(); + int numberOfRows = 0; + ResultScanner scanner = table.getScanner(scan); + Result[] result = scanner.next(1); + while (result != null && result.length > 0) { + numberOfRows++; + result = scanner.next(1); + } + assertEquals("Number of rows should be equal to number of puts.", numberOfPuts, + numberOfRows); + } finally { + if (table != null) table.close(); + admin.close(); } - assertEquals("Number of rows should be equal to number of puts.", numberOfPuts, numberOfRows); } static class MockLoadBalancer extends DefaultLoadBalancer {