diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index b04faac..a0ce179 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -595,7 +595,7 @@ public interface Admin extends Abortable, Closeable { /** * Compact all regions on the region server - * @param regionserver the region server name + * @param sn the region server name * @param major if it's major compaction * @throws IOException * @throws InterruptedException @@ -1288,7 +1288,7 @@ public interface Admin extends Abortable, Closeable { * @return A RegionServerCoprocessorRpcChannel instance */ CoprocessorRpcChannel coprocessorService(ServerName sn); - + /** * Update the configuration and trigger an online config change diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 3f55b0e..e0c14a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -163,7 +163,7 @@ class AsyncProcess { // TODO: many of the fields should be made private protected final long id; - protected final ClusterConnection hConnection; + protected final ClusterConnection connection; protected final RpcRetryingCallerFactory rpcCallerFactory; protected final RpcControllerFactory rpcFactory; protected final BatchErrors globalErrors; @@ -246,7 +246,7 @@ class AsyncProcess { throw new IllegalArgumentException("HConnection cannot be null."); } - this.hConnection = hc; + this.connection = hc; this.pool = pool; this.globalErrors = useGlobalErrors ? new BatchErrors() : null; @@ -340,7 +340,7 @@ class AsyncProcess { new HashMap>(); List> retainedActions = new ArrayList>(rows.size()); - NonceGenerator ng = this.hConnection.getNonceGenerator(); + NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. // Location errors that happen before we decide what requests to take. @@ -363,7 +363,7 @@ class AsyncProcess { try { if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); // Make sure we get 0-s replica. - RegionLocations locs = hConnection.locateRegion( + RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { throw new IOException("#" + id + ", no location found, aborting submit for" @@ -535,7 +535,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; - NonceGenerator ng = this.hConnection.getNonceGenerator(); + NonceGenerator ng = this.connection.getNonceGenerator(); for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -910,7 +910,7 @@ class AsyncProcess { ", row cannot be null"); RegionLocations loc = null; try { - loc = hConnection.locateRegion( + loc = connection.locateRegion( tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); } catch (IOException ex) { manageLocationError(action, ex); @@ -1025,7 +1025,7 @@ class AsyncProcess { if (tableName == null) { // tableName is null when we made a cross-table RPC call. - hConnection.clearCaches(server); + connection.clearCaches(server); } int failed = 0, stopped = 0; List> toReplay = new ArrayList>(); @@ -1036,7 +1036,7 @@ class AsyncProcess { // any of the regions in the MultiAction. // TODO: depending on type of exception we might not want to update cache at all? if (tableName != null) { - hConnection.updateCachedLocations(tableName, regionName, row, null, server); + connection.updateCachedLocations(tableName, regionName, row, null, server); } for (Action action : e.getValue()) { Retry retry = manageError( @@ -1150,7 +1150,7 @@ class AsyncProcess { // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; - hConnection.updateCachedLocations( + connection.updateCachedLocations( tableName, regionName, row.getRow(), result, server); } if (failureCount == 0) { @@ -1199,7 +1199,7 @@ class AsyncProcess { errorsByServer.reportServerError(server); canRetry = errorsByServer.canRetryMore(numAttempt); } - hConnection.updateCachedLocations( + connection.updateCachedLocations( tableName, region, actions.get(0).getAction().getRow(), throwable, server); failureCount += actions.size(); @@ -1514,7 +1514,7 @@ class AsyncProcess { @VisibleForTesting protected MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { - return new MultiServerCallable(hConnection, tableName, server, this.rpcFactory, multi); + return new MultiServerCallable(connection, tableName, server, this.rpcFactory, multi); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index bfdf5d2..147a203 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -249,7 +249,7 @@ class ConnectionManager { */ @VisibleForTesting static NonceGenerator injectNonceGeneratorForTesting( - HConnection conn, NonceGenerator cnm) { + ClusterConnection conn, NonceGenerator cnm) { HConnectionImplementation connImpl = (HConnectionImplementation)conn; NonceGenerator ng = connImpl.getNonceGenerator(); LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index e26ae48..72b447a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -79,7 +79,7 @@ public class ConnectionUtils { * @return old nonce generator. */ public static NonceGenerator injectNonceGeneratorForTesting( - HConnection conn, NonceGenerator cnm) { + ClusterConnection conn, NonceGenerator cnm) { return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm); } @@ -111,7 +111,7 @@ public class ConnectionUtils { * @param client the client interface of the local server * @return an adapted/decorated HConnection */ - public static HConnection createShortCircuitHConnection(final HConnection conn, + public static HConnection createShortCircuitHConnection(final Connection conn, final ServerName serverName, final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) { return new ConnectionAdapter(conn) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 329a373..109352b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; @@ -175,8 +174,6 @@ public class HBaseAdmin implements Admin { private static final String ZK_IDENTIFIER_PREFIX = "hbase-admin-on-"; - // We use the implementation class rather then the interface because we - // need the package protected functions to get the connection to master private ClusterConnection connection; private volatile Configuration conf; @@ -1443,8 +1440,7 @@ public class HBaseAdmin implements Admin { * Get all the online regions on a region server. */ @Override - public List getOnlineRegions( - final ServerName sn) throws IOException { + public List getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); return ProtobufUtil.getOnlineRegions(admin); } @@ -2333,12 +2329,6 @@ public class HBaseAdmin implements Admin { }); } - private HRegionLocation getFirstMetaServerForTable(final TableName tableName) - throws IOException { - return connection.locateRegion(TableName.META_TABLE_NAME, - HRegionInfo.createRegionName(tableName, null, HConstants.NINES, false)); - } - /** * @return Configuration used by the instance. */ @@ -2495,52 +2485,40 @@ public class HBaseAdmin implements Admin { /** * Check to see if HBase is running. Throw an exception if not. - * We consider that HBase is running if ZooKeeper and Master are running. - * * @param conf system configuration * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper */ + // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not. public static void checkHBaseAvailable(Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { + throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { Configuration copyOfConf = HBaseConfiguration.create(conf); - // We set it to make it fail as soon as possible if HBase is not available copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); copyOfConf.setInt("zookeeper.recovery.retry", 0); - - ConnectionManager.HConnectionImplementation connection - = (ConnectionManager.HConnectionImplementation) - HConnectionManager.getConnection(copyOfConf); - - try { - // Check ZK first. - // If the connection exists, we may have a connection to ZK that does - // not work anymore - ZooKeeperKeepAliveConnection zkw = null; - try { - zkw = connection.getKeepAliveZooKeeperWatcher(); - zkw.getRecoverableZooKeeper().getZooKeeper().exists( - zkw.baseZNode, false); - - } catch (IOException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } catch (InterruptedException e) { - throw (InterruptedIOException) + try (ClusterConnection connection = + (ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) { + // Check ZK first. + // If the connection exists, we may have a connection to ZK that does not work anymore + ZooKeeperKeepAliveConnection zkw = null; + try { + // This is NASTY. FIX!!!! Dependent on internal implementation! TODO + zkw = ((ConnectionManager.HConnectionImplementation)connection). + getKeepAliveZooKeeperWatcher(); + zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false); + } catch (IOException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException("Can't connect to ZooKeeper").initCause(e); - } catch (KeeperException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } finally { - if (zkw != null) { - zkw.close(); + } catch (KeeperException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } finally { + if (zkw != null) { + zkw.close(); + } } - } - - // Check Master connection.isMasterRunning(); - - } finally { - connection.close(); } } @@ -3779,8 +3757,9 @@ public class HBaseAdmin implements Admin { @Override public int getMasterInfoPort() throws IOException { + // TODO: Fix! Reaching into internal implementation!!!! ConnectionManager.HConnectionImplementation connection = - (ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf); + (ConnectionManager.HConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterInfoPort(zkw); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 4ecddb5..a49f95c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -63,6 +63,7 @@ public class MetaCache { * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. * + * * @param tableName * @param row * @return Null or region location found in cache. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java index 8168fe1..39518a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Pair; /** * Used to view region location information for a single HBase table. - * Obtain an instance from an {@link HConnection}. + * Obtain an instance from an {@link Connection}. * * @see ConnectionFactory * @see Connection diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index 75b0175..3dc9aa6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -84,16 +84,19 @@ public class ZKConfig { Properties zkProperties = new Properties(); // Directly map all of the hbase.zookeeper.property.KEY properties. - for (Entry entry : new Configuration(conf)) { // copy for mt safety - String key = entry.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); + // Synchronize on conf so no loading of configs while we iterate + synchronized (conf) { + for (Entry entry : conf) { + String key = entry.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.put(zkKey, value); } - zkProperties.put(zkKey, value); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index d97862d..6bc4143 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -25,9 +25,11 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterManager.ServiceType; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; @@ -44,8 +46,8 @@ import com.google.common.collect.Sets; */ @InterfaceAudience.Private public class DistributedHBaseCluster extends HBaseCluster { - - private HBaseAdmin admin; + private Admin admin; + private final Connection connection; private ClusterManager clusterManager; @@ -53,7 +55,8 @@ public class DistributedHBaseCluster extends HBaseCluster { throws IOException { super(conf); this.clusterManager = clusterManager; - this.admin = new HBaseAdmin(conf); + this.connection = ConnectionFactory.createConnection(conf); + this.admin = this.connection.getAdmin(); this.initialClusterStatus = getClusterStatus(); } @@ -84,18 +87,21 @@ public class DistributedHBaseCluster extends HBaseCluster { if (this.admin != null) { admin.close(); } + if (this.connection != null && !this.connection.isClosed()) { + this.connection.close(); + } } @Override public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { - return admin.getConnection().getAdmin(serverName); + return ((ClusterConnection)this.connection).getAdmin(serverName); } @Override public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) throws IOException { - return admin.getConnection().getClient(serverName); + return ((ClusterConnection)this.connection).getClient(serverName); } @Override @@ -138,8 +144,7 @@ public class DistributedHBaseCluster extends HBaseCluster { @Override public MasterService.BlockingInterface getMasterAdminService() throws IOException { - HConnection conn = HConnectionManager.getConnection(conf); - return conn.getMaster(); + return ((ClusterConnection)this.connection).getMaster(); } @Override @@ -183,18 +188,19 @@ public class DistributedHBaseCluster extends HBaseCluster { } @Override - public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { - HConnection connection = admin.getConnection(); - HRegionLocation regionLoc = connection.locateRegion(regionName); + public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { + HRegionLocation regionLoc = null; + try (RegionLocator locator = connection.getRegionLocator(tn)) { + regionLoc = locator.getRegionLocation(regionName); + } if (regionLoc == null) { - LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) - + " for table " + HRegionInfo.getTableName(regionName) + ", start key [" + - Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]"); + LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) + + ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]"); return null; } AdminProtos.AdminService.BlockingInterface client = - connection.getAdmin(regionLoc.getServerName()); + ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName()); ServerInfo info = ProtobufUtil.getServerInfo(client); return ProtobufUtil.toServerName(info.getServerName()); } @@ -374,7 +380,7 @@ public class DistributedHBaseCluster extends HBaseCluster { } catch (IOException ioe) { LOG.warn("While closing the old connection", ioe); } - this.admin = new HBaseAdmin(conf); + this.admin = this.connection.getAdmin(); LOG.info("Added new HBaseAdmin"); return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java index 0d9b65a..cde94fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java @@ -37,4 +37,4 @@ import org.apache.hadoop.mapreduce.Mapper; public abstract class TableMapper extends Mapper { -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index c7c6639..84f6891 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * while the output value must be either a {@link Put} or a * {@link Delete} instance. * - *

is the type of the key. Ignored in this class. */ @InterfaceAudience.Public @InterfaceStability.Stable diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index 1d16bdf..e7d52a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -48,9 +48,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -67,9 +67,9 @@ import org.apache.log4j.Logger; /** * A tool that is used for manipulating and viewing favored nodes information * for regions. Run with -h to get a list of the options - * */ @InterfaceAudience.Private +// TODO: Remove? Unused. Partially implemented only. public class RegionPlacementMaintainer { private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class .getName()); @@ -93,9 +93,9 @@ public class RegionPlacementMaintainer { private Configuration conf; private final boolean enforceLocality; private final boolean enforceMinAssignmentMove; - private HBaseAdmin admin; private RackManager rackManager; private Set targetTableSet; + private final Connection connection; public RegionPlacementMaintainer(Configuration conf) { this(conf, true, true); @@ -108,7 +108,13 @@ public class RegionPlacementMaintainer { this.enforceMinAssignmentMove = enforceMinAssignmentMove; this.targetTableSet = new HashSet(); this.rackManager = new RackManager(conf); + try { + this.connection = ConnectionFactory.createConnection(this.conf); + } catch (IOException e) { + throw new RuntimeException(e); + } } + private static void printHelp(Options opt) { new HelpFormatter().printHelp( "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + @@ -125,24 +131,13 @@ public class RegionPlacementMaintainer { } /** - * @return the cached HBaseAdmin - * @throws IOException - */ - private HBaseAdmin getHBaseAdmin() throws IOException { - if (this.admin == null) { - this.admin = new HBaseAdmin(this.conf); - } - return this.admin; - } - - /** * @return the new RegionAssignmentSnapshot * @throws IOException */ public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException { SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = - new SnapshotOfRegionAssignmentFromMeta(HConnectionManager.getConnection(conf)); + new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); currentAssignmentShapshot.initialize(); return currentAssignmentShapshot; } @@ -210,8 +205,10 @@ public class RegionPlacementMaintainer { // Get the all the region servers List servers = new ArrayList(); - servers.addAll(getHBaseAdmin().getClusterStatus().getServers()); - + try (Admin admin = this.connection.getAdmin()) { + servers.addAll(admin.getClusterStatus().getServers()); + } + LOG.info("Start to generate assignment plan for " + numRegions + " regions from table " + tableName + " with " + servers.size() + " region servers"); @@ -619,9 +616,9 @@ public class RegionPlacementMaintainer { // sort the map based on region info Map> assignmentMap = new TreeMap>(plan.getAssignmentMap()); - + for (Map.Entry> entry : assignmentMap.entrySet()) { - + String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); String regionName = entry.getKey().getRegionNameAsString(); LOG.info("Region: " + regionName ); @@ -660,7 +657,6 @@ public class RegionPlacementMaintainer { // Get the region to region server map Map> currentAssignment = this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); - HConnection connection = this.getHBaseAdmin().getConnection(); // track of the failed and succeeded updates int succeededNum = 0; @@ -691,10 +687,11 @@ public class RegionPlacementMaintainer { } if (singleServerPlan != null) { // Update the current region server with its updated favored nodes - BlockingInterface currentRegionServer = connection.getAdmin(entry.getKey()); + BlockingInterface currentRegionServer = + ((ClusterConnection)this.connection).getAdmin(entry.getKey()); UpdateFavoredNodesRequest request = RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); - + UpdateFavoredNodesResponse updateFavoredNodesResponse = currentRegionServer.updateFavoredNodes(null, request); LOG.info("Region server " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index ff0706b..39d0a0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -36,7 +36,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.HRegionInfo; @@ -46,8 +45,9 @@ import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; @@ -137,7 +137,7 @@ public class ServerManager { private final Server master; private final MasterServices services; - private final HConnection connection; + private final ClusterConnection connection; private final DeadServer deadservers = new DeadServer(); @@ -201,7 +201,7 @@ public class ServerManager { Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); - this.connection = connect ? HConnectionManager.getConnection(c) : null; + this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ed04a86..464ad7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -610,7 +611,7 @@ public class HRegionServer extends HasThread implements */ protected HConnection createShortCircuitConnection() throws IOException { return ConnectionUtils.createShortCircuitHConnection( - HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices); + ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 8bdf12a..a182aa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; @@ -135,9 +134,6 @@ public class SplitLogWorker implements Runnable { try { LOG.info("SplitLogWorker " + server.getServerName() + " starting"); coordination.registerListener(); - // pre-initialize a new connection for splitlogworker configuration - HConnectionManager.getConnection(conf); - // wait for Coordination Engine is ready boolean res = false; while (!res && !coordination.isStop()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java index 76a9566..9e7a0c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java @@ -250,15 +250,18 @@ public abstract class HBaseCluster implements Closeable, Configurable { * Get the ServerName of region server serving the first hbase:meta region */ public ServerName getServerHoldingMeta() throws IOException { - return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + return getServerHoldingRegion(TableName.META_TABLE_NAME, + HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); } /** * Get the ServerName of region server serving the specified region * @param regionName Name of the region in bytes + * @param tn Table name that has the region. * @return ServerName that hosts the region or null */ - public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException; + public abstract ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) + throws IOException; /** * @return whether we are interacting with a distributed cluster as opposed to an diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index a3ec004..7672ac1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -517,7 +516,6 @@ public class MiniHBaseCluster extends HBaseCluster { if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); } - HConnectionManager.deleteAllConnections(false); } @Override @@ -657,7 +655,8 @@ public class MiniHBaseCluster extends HBaseCluster { } @Override - public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { + public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) + throws IOException { // Assume there is only one master thread which is the active master. // If there are multiple master threads, the backup master threads // should hold some regions. Please refer to #countServedRegions diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index 3d3b499..131adb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -882,7 +882,6 @@ public class TestAdmin1 { admin.createTable(desc, splitKeys); HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); Map regions = ht.getRegionLocations(); - ht.close(); assertEquals("Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); // Disable table. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index 7521bb7..73fffe3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -52,7 +52,7 @@ public class TestHBaseAdminNoCluster { * @throws ServiceException */ @Test - public void testMasterMonitorCollableRetries() + public void testMasterMonitorCallableRetries() throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { Configuration configuration = HBaseConfiguration.create(); // Set the pause and retry count way down. @@ -61,20 +61,18 @@ public class TestHBaseAdminNoCluster { configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); // Get mocked connection. Getting the connection will register it so when HBaseAdmin is // constructed with same configuration, it will find this mocked connection. - HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); + ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); // Mock so we get back the master interface. Make it so when createTable is called, we throw // the PleaseHoldException. - MasterKeepAliveConnection masterAdmin = - Mockito.mock(MasterKeepAliveConnection.class); + MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), (CreateTableRequest)Mockito.any())). thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); - // Mock up our admin Interfaces - Admin admin = new HBaseAdmin(configuration); + Admin admin = new HBaseAdmin(connection); try { HTableDescriptor htd = - new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries")); + new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries")); // Pass any old htable descriptor; not important try { admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); @@ -87,7 +85,7 @@ public class TestHBaseAdminNoCluster { (CreateTableRequest)Mockito.any()); } finally { admin.close(); - if (connection != null)HConnectionManager.deleteConnection(configuration); + if (connection != null) connection.close(); } } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 5a6065c..e8f668b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -132,6 +132,8 @@ public class TestHCM { @BeforeClass public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); if (isJavaOk) { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); } @@ -1010,117 +1012,114 @@ public class TestHCM { @Test (timeout=30000) public void testMulti() throws Exception { HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM); - TEST_UTIL.createMultiRegions(table, FAM_NAM); - ConnectionManager.HConnectionImplementation conn = - (ConnectionManager.HConnectionImplementation) - HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); - - // We're now going to move the region and check that it works for the client - // First a new put to add the location in the cache - conn.clearRegionCache(TABLE_NAME3); - Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); - - TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - - // We can wait for all regions to be online, that makes log reading easier when debugging - TEST_UTIL.waitUntilNoRegionsInTransition(20000); - - Put put = new Put(ROW_X); - put.add(FAM_NAM, ROW_X, ROW_X); - table.put(put); - - // Now moving the region to the second server - HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); - if (toMove == null) { - String msg = "Failed to find location for " + Bytes.toString(ROW_X) + " in " + TABLE_NAME3; - // Log so easier to see in output where error occurred. - LOG.error(msg); - throw new NullPointerException(msg); - } - byte[] regionName = toMove.getRegionInfo().getRegionName(); - byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); - - // Choose the other server. - int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); - int destServerId = (curServerId == 0 ? 1 : 0); - - HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); - HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); - - ServerName destServerName = destServer.getServerName(); - - //find another row in the cur server that is less than ROW_X - List regions = curServer.getOnlineRegions(TABLE_NAME3); - byte[] otherRow = null; - for (HRegion region : regions) { - if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) - && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { - otherRow = region.getRegionInfo().getStartKey(); - break; - } - } - assertNotNull(otherRow); - // If empty row, set it to first row.-f - if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); - Put put2 = new Put(otherRow); - put2.add(FAM_NAM, otherRow, otherRow); - table.put(put2); //cache put2's location - - // Check that we are in the expected state - Assert.assertTrue(curServer != destServer); - Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); - Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); - Assert.assertNotNull(curServer.getOnlineRegion(regionName)); - Assert.assertNull(destServer.getOnlineRegion(regionName)); - Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). - getAssignmentManager().getRegionStates().isRegionsInTransition()); - - // Moving. It's possible that we don't have all the regions online at this point, so - // the test must depends only on the region we're looking at. - LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); - TEST_UTIL.getHBaseAdmin().move( - toMove.getRegionInfo().getEncodedNameAsBytes(), - destServerName.getServerName().getBytes() - ); - - while (destServer.getOnlineRegion(regionName) == null || - destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || - curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || - master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { - // wait for the move to be finished - Thread.sleep(1); - } + try { + TEST_UTIL.createMultiRegions(table, FAM_NAM); + ConnectionManager.HConnectionImplementation conn = + ( ConnectionManager.HConnectionImplementation)table.getConnection(); + + // We're now going to move the region and check that it works for the client + // First a new put to add the location in the cache + conn.clearRegionCache(TABLE_NAME3); + Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); + + TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + + // We can wait for all regions to be online, that makes log reading easier when debugging + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(1); + } + + Put put = new Put(ROW_X); + put.add(FAM_NAM, ROW_X, ROW_X); + table.put(put); + + // Now moving the region to the second server + HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); + byte[] regionName = toMove.getRegionInfo().getRegionName(); + byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); + + // Choose the other server. + int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); + int destServerId = (curServerId == 0 ? 1 : 0); + + HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); + HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); + + ServerName destServerName = destServer.getServerName(); + + //find another row in the cur server that is less than ROW_X + List regions = curServer.getOnlineRegions(TABLE_NAME3); + byte[] otherRow = null; + for (HRegion region : regions) { + if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) + && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { + otherRow = region.getRegionInfo().getStartKey(); + break; + } + } + assertNotNull(otherRow); + // If empty row, set it to first row.-f + if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); + Put put2 = new Put(otherRow); + put2.add(FAM_NAM, otherRow, otherRow); + table.put(put2); //cache put2's location + + // Check that we are in the expected state + Assert.assertTrue(curServer != destServer); + Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); + Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionStates().isRegionsInTransition()); + + // Moving. It's possible that we don't have all the regions online at this point, so + // the test must depends only on the region we're looking at. + LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + destServerName.getServerName().getBytes() + ); + + while (destServer.getOnlineRegion(regionName) == null || + destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(1); + } - LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); + LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); - // Check our new state. - Assert.assertNull(curServer.getOnlineRegion(regionName)); - Assert.assertNotNull(destServer.getOnlineRegion(regionName)); - Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); - Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + // Check our new state. + Assert.assertNull(curServer.getOnlineRegion(regionName)); + Assert.assertNotNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); - // Cache was NOT updated and points to the wrong server - Assert.assertFalse( - conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() - .getPort() == destServerName.getPort()); + // Cache was NOT updated and points to the wrong server + Assert.assertFalse( + conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() + .getPort() == destServerName.getPort()); - // Hijack the number of retry to fail after 2 tries - final int prevNumRetriesVal = setNumTries(conn, 2); + // Hijack the number of retry to fail after 2 tries + final int prevNumRetriesVal = setNumTries(conn, 2); - Put put3 = new Put(ROW_X); - put3.add(FAM_NAM, ROW_X, ROW_X); - Put put4 = new Put(otherRow); - put4.add(FAM_NAM, otherRow, otherRow); + Put put3 = new Put(ROW_X); + put3.add(FAM_NAM, ROW_X, ROW_X); + Put put4 = new Put(otherRow); + put4.add(FAM_NAM, otherRow, otherRow); - // do multi - table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, - // second we get RegionMovedException. + // do multi + table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, + // second we get RegionMovedException. - setNumTries(conn, prevNumRetriesVal); - table.close(); - conn.close(); + setNumTries(conn, prevNumRetriesVal); + } finally { + table.close(); + } } @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 388b3c9..69267ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -33,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -42,14 +40,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.exceptions.OperationConflictException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -71,6 +66,7 @@ public class TestMultiParallel { private static final byte [][] KEYS = makeKeys(); private static final int slaves = 5; // also used for testing HTable pool size + private static Connection CONNECTION; @BeforeClass public static void beforeClass() throws Exception { // Uncomment the following lines if more verbosity is needed for @@ -83,9 +79,11 @@ public class TestMultiParallel { UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); UTIL.waitTableEnabled(TEST_TABLE); t.close(); + CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); } @AfterClass public static void afterClass() throws Exception { + CONNECTION.close(); UTIL.shutdownMiniCluster(); } @@ -98,9 +96,6 @@ public class TestMultiParallel { // Wait until completing balance UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); } - HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration()); - conn.clearRegionCache(); - conn.close(); LOG.info("before done"); } @@ -150,20 +145,26 @@ public class TestMultiParallel { * @throws SecurityException */ @Test(timeout=300000) - public void testActiveThreadsCount() throws Exception{ - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); - List puts = constructPutRequests(); // creates a Put for every region - table.batch(puts); - HashSet regionservers = new HashSet(); - for (byte[] k : KEYS) { - HRegionLocation location = ((HTable)table).getRegionLocation(k); - regionservers.add(location.getServerName()); + public void testActiveThreadsCount() throws Exception { + try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { + ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); + try { + try (Table t = connection.getTable(TEST_TABLE, executor)) { + List puts = constructPutRequests(); // creates a Put for every region + t.batch(puts); + HashSet regionservers = new HashSet(); + try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { + for (Row r : puts) { + HRegionLocation location = locator.getRegionLocation(r.getRow()); + regionservers.add(location.getServerName()); + } + } + assertEquals(regionservers.size(), executor.getLargestPoolSize()); + } + } finally { + executor.shutdownNow(); + } } - Field poolField = table.getClass().getDeclaredField("pool"); - poolField.setAccessible(true); - ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table); - assertEquals(regionservers.size(), tExecutor.getLargestPoolSize()); - table.close(); } @Test(timeout=300000) @@ -198,7 +199,7 @@ public class TestMultiParallel { Cell[] multiKvs = multiRes[i].rawCells(); for (int j = 0; j < singleKvs.length; j++) { Assert.assertEquals(singleKvs[j], multiKvs[j]); - Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), + Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j]))); } } @@ -330,7 +331,7 @@ public class TestMultiParallel { @Test (timeout=300000) public void testBatchWithPut() throws Exception { LOG.info("test=testBatchWithPut"); - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch List puts = constructPutRequests(); @@ -348,9 +349,8 @@ public class TestMultiParallel { results = table.batch(puts); } catch (RetriesExhaustedWithDetailsException ree) { LOG.info(ree.getExhaustiveDescription()); - throw ree; - } finally { table.close(); + throw ree; } validateSizeAndEmpty(results, KEYS.length); } @@ -491,7 +491,8 @@ public class TestMultiParallel { @Test(timeout=300000) public void testNonceCollision() throws Exception { LOG.info("test=testNonceCollision"); - HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table table = connection.getTable(TEST_TABLE); Put put = new Put(ONE_ROW); put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); @@ -510,8 +511,9 @@ public class TestMultiParallel { return nonce; } }; + NonceGenerator oldCnm = - ConnectionUtils.injectNonceGeneratorForTesting(table.getConnection(), cnm); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm); // First test sequential requests. try { @@ -541,7 +543,7 @@ public class TestMultiParallel { public void run() { Table table = null; try { - table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + table = connection.getTable(TEST_TABLE); } catch (IOException e) { fail("Not expected"); } @@ -574,7 +576,7 @@ public class TestMultiParallel { validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); table.close(); } finally { - ConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm); + ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 47501d8..323d0e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; @@ -232,10 +234,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { * @throws IOException */ void assertExpectedTable(TableName table, int count, int value) throws IOException { + HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); Table t = null; try { - assertEquals( - util.getHBaseAdmin().listTables(table.getNameAsString()).length, 1); t = new HTable(util.getConfiguration(), table); Scan s = new Scan(); ResultScanner sr = t.getScanner(s); @@ -444,30 +446,33 @@ public class TestLoadIncrementalHFilesSplitRecovery { public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta"); byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; - HTable table = new HTable(util.getConfiguration(), tableName); + // Share connection. We were failing to find the table with our new reverse scan because it + // looks for first region, not any region -- that is how it works now. The below removes first + // region in test. Was reliant on the Connection caching having first region. + Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); + Table table = connection.getTable(tableName); setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); Path dir = buildBulkFiles(tableName, 2); final AtomicInteger countedLqis = new AtomicInteger(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( - util.getConfiguration()) { - - protected List groupOrSplit( - Multimap regionGroups, - final LoadQueueItem item, final HTable htable, - final Pair startEndKeys) throws IOException { - List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + + protected List groupOrSplit( + Multimap regionGroups, + final LoadQueueItem item, final HTable htable, + final Pair startEndKeys) throws IOException { + List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); + if (lqis != null) { + countedLqis.addAndGet(lqis.size()); + } + return lqis; } - return lqis; - } - }; + }; // do bulkload when there is no region hole in hbase:meta. try { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(dir, (HTable)table); } catch (Exception e) { LOG.error("exeception=", e); } @@ -477,18 +482,16 @@ public class TestLoadIncrementalHFilesSplitRecovery { dir = buildBulkFiles(tableName, 3); // Mess it up by leaving a hole in the hbase:meta - HConnection hConnection = HConnectionManager.getConnection(util.getConfiguration()); - List regionInfos = MetaTableAccessor.getTableRegions( - hConnection, tableName); + List regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); for (HRegionInfo regionInfo : regionInfos) { if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { - MetaTableAccessor.deleteRegion(hConnection, regionInfo); + MetaTableAccessor.deleteRegion(connection, regionInfo); break; } } try { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(dir, (HTable)table); } catch (Exception e) { LOG.error("exeception=", e); assertTrue("IOException expected", e instanceof IOException); @@ -496,7 +499,42 @@ public class TestLoadIncrementalHFilesSplitRecovery { table.close(); - this.assertExpectedTable(tableName, ROWCOUNT, 2); + // Make sure at least the one region that still exists can be found. + regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + assertTrue(regionInfos.size() >= 1); + + this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); + connection.close(); } -} + /** + * Checks that all columns have the expected value and that there is the + * expected number of rows. + * @throws IOException + */ + void assertExpectedTable(final Connection connection, TableName table, int count, int value) + throws IOException { + HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); + Table t = null; + try { + t = connection.getTable(table); + Scan s = new Scan(); + ResultScanner sr = t.getScanner(s); + int i = 0; + for (Result r : sr) { + i++; + for (NavigableMap nm : r.getNoVersionMap().values()) { + for (byte[] val : nm.values()) { + assertTrue(Bytes.equals(val, value(value))); + } + } + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } finally { + if (t != null) t.close(); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 366a36b..a16d7e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -107,8 +108,6 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -337,7 +336,7 @@ public class TestDistributedLogSplitting { HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE); NonceGeneratorWithDups ng = new NonceGeneratorWithDups(); NonceGenerator oldNg = - ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), ng); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng); try { List reqs = new ArrayList(); @@ -371,7 +370,7 @@ public class TestDistributedLogSplitting { } } } finally { - ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), oldNg); + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg); ht.close(); zkw.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 5593d80..49ded21 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -179,7 +179,6 @@ public class TestScannerWithBulkload { table.put(put1); table.flushCommits(); admin.flush(tableName); - admin.close(); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); @@ -200,7 +199,7 @@ public class TestScannerWithBulkload { @Test public void testBulkLoadWithParallelScan() throws Exception { TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan"); - final long l = System.currentTimeMillis(); + final long l = System.currentTimeMillis(); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); createTable(admin, tableName); Scan scan = createScan(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 29073ed..678adc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -107,8 +107,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** - * Like {@link TestSplitTransaction} in that we're testing {@link SplitTransaction} - * only the below tests are against a running cluster where {@link TestSplitTransaction} + * Like TestSplitTransaction in that we're testing {@link SplitTransaction} + * only the below tests are against a running cluster where TestSplitTransaction * is tests against a bare {@link HRegion}. */ @Category({RegionServerTests.class, LargeTests.class}) @@ -904,7 +904,7 @@ public class TestSplitTransactionOnCluster { fail("Each table should have at least one region."); } ServerName serverName = - cluster.getServerHoldingRegion(firstTableRegions.get(0).getRegionName()); + cluster.getServerHoldingRegion(firstTable, firstTableRegions.get(0).getRegionName()); admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(serverName.getServerName())); Table table1 = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 7042724..f9aeb6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -224,7 +224,8 @@ public class TestMultiSlaveReplication { region.getWAL().registerWALActionsListener(listener); // request a roll - admin.rollWALWriter(cluster.getServerHoldingRegion(region.getRegionName())); + admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), + region.getRegionName())); // wait try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index de46f25..ad9b227 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -29,15 +28,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -48,10 +45,10 @@ import org.junit.BeforeClass; * All other tests should have their own classes and extend this one */ public class TestReplicationBase { - +/* { ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL); - } + }*/ private static final Log LOG = LogFactory.getLog(TestReplicationBase.class); @@ -64,7 +61,7 @@ public class TestReplicationBase { protected static ReplicationAdmin admin; - protected static HTable htable1; + protected static Table htable1; protected static Table htable2; protected static HBaseTestingUtility utility1; @@ -138,15 +135,19 @@ public class TestReplicationBase { table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); - Admin admin1 = new HBaseAdmin(conf1); - Admin admin2 = new HBaseAdmin(conf2); - admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } utility1.waitUntilAllRegionsAssigned(tableName); - admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); utility2.waitUntilAllRegionsAssigned(tableName); - htable1 = new HTable(conf1, tableName); + htable1 = connection1.getTable(tableName); htable1.setWriteBufferSize(1024); - htable2 = new HTable(conf2, tableName); + htable2 = connection2.getTable(tableName); } /** @@ -154,10 +155,10 @@ public class TestReplicationBase { */ @AfterClass public static void tearDownAfterClass() throws Exception { + htable2.close(); + htable1.close(); + admin.close(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); } - - -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index 6dc3548..0319607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -54,7 +54,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 456a086..633dcc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -26,16 +26,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -49,6 +54,7 @@ import org.junit.experimental.categories.Category; */ @Category({ReplicationTests.class, MediumTests.class}) public class TestReplicationEndpoint extends TestReplicationBase { + static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class); static int numRegionServers; @@ -72,13 +78,14 @@ public class TestReplicationEndpoint extends TestReplicationBase { ReplicationEndpointForTest.contructedCount.set(0); ReplicationEndpointForTest.startedCount.set(0); ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointForTest.lastEntries = null; for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } } - @Test + @Test (timeout=120000) public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", @@ -117,17 +124,32 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testCustomReplicationEndpoint"); } - @Test + @Test (timeout=120000) public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { - admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", + Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); + int peerCount = admin.getPeersCount(); + final String id = "testReplicationEndpointReturnsFalseOnReplicate"; + admin.addPeer(id, new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); - // now replicate some data. + // This test is flakey and then there is so much stuff flying around in here its, hard to + // debug. Peer needs to be up for the edit to make it across. This wait on + // peer count seems to be a hack that has us not progress till peer is up. + if (admin.getPeersCount() <= peerCount) { + LOG.info("Waiting on peercount to go up from " + peerCount); + Threads.sleep(100); + } + // now replicate some data doPut(row); Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { + // Looks like replication endpoint returns false unless we put more than 10 edits. We + // only send over one edit. + int count = ReplicationEndpointForTest.replicateCount.get(); + LOG.info("count=" + count); return ReplicationEndpointReturningFalse.replicated.get(); } }); @@ -138,15 +160,17 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); } - @Test + @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); // now replicate some data. - doPut(Bytes.toBytes("row1")); - doPut(row); - doPut(Bytes.toBytes("row2")); + try (Connection connection = ConnectionFactory.createConnection(conf1)) { + doPut(connection, Bytes.toBytes("row1")); + doPut(connection, row); + doPut(connection, Bytes.toBytes("row2")); + } Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @Override @@ -161,11 +185,17 @@ public class TestReplicationEndpoint extends TestReplicationBase { private void doPut(byte[] row) throws IOException { - Put put = new Put(row); - put.add(famName, row, row); - htable1 = new HTable(conf1, tableName); - htable1.put(put); - htable1.close(); + try (Connection connection = ConnectionFactory.createConnection(conf1)) { + doPut(connection, row); + } + } + + private void doPut(final Connection connection, final byte [] row) throws IOException { + try (Table t = connection.getTable(tableName)) { + Put put = new Put(row); + put.add(famName, row, row); + t.put(put); + } } private static void doAssert(byte[] row) throws Exception { @@ -217,6 +247,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { + static int COUNT = 10; static AtomicReference ex = new AtomicReference(null); static AtomicBoolean replicated = new AtomicBoolean(false); @Override @@ -229,8 +260,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { } super.replicate(replicateContext); + LOG.info("Replicated " + row + ", count=" + replicateCount.get()); - replicated.set(replicateCount.get() > 10); // first 10 times, we return false + replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false return replicated.get(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java index 5739aee..6a6cf21 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -54,7 +55,7 @@ public class TestReplicationKillRS extends TestReplicationBase { Thread killer = killARegionServer(util, 5000, rsToKill1); LOG.info("Start loading table"); - int initialCount = utility1.loadTable(htable1, famName); + int initialCount = utility1.loadTable((HTable)htable1, famName); LOG.info("Done loading table"); killer.join(5000); LOG.info("Done waiting for threads"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 3ecec91..bb7794f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -69,7 +69,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlush(true, true); + ((HTable)htable1).setAutoFlush(true, true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : @@ -247,7 +247,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { LOG.info("testSmallBatch"); Put put; // normal Batch tests - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { put = new Put(Bytes.toBytes(i)); put.add(famName, row, row); @@ -387,7 +387,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testLoading() throws Exception { LOG.info("Writing out rows to table1 in testLoading"); htable1.setWriteBufferSize(1024); - htable1.setAutoFlush(false, true); + ((HTable)htable1).setAutoFlush(false, true); for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { Put put = new Put(Bytes.toBytes(i)); put.add(famName, row, row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java index c41e977..6fa6cbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java @@ -86,7 +86,7 @@ public class TestAccessController2 extends SecureTestUtil { public Object run() throws Exception { HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName()); desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); - Admin admin = new HBaseAdmin(conf); + Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); try { admin.createTable(desc); } finally {