Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (revision 0) @@ -0,0 +1,481 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.SortedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.KeyedData; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Provides administrative functions for HBase + */ +public class HBaseAdmin implements HConstants { + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + protected final HConnection connection; + protected final long pause; + protected final int numRetries; + protected volatile HMasterInterface master; + + /** + * Constructor + * + * @param conf Configuration object + * @throws MasterNotRunningException + */ + public HBaseAdmin(Configuration conf) throws MasterNotRunningException { + this.connection = HConnectionManager.getConnection(conf); + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + this.master = connection.getMaster(); + } + + /** + * Creates a new table + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws NoServerForRegionException if root region is not being served + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence + * and attempt-at-creation). + * @throws IOException + */ + public void createTable(HTableDescriptor desc) + throws IOException { + + createTableAsync(desc); + + // Wait for new table to come on-line + connection.getTableServers(desc.getName()); + } + + /** + * Creates a new table but does not block and wait for it to come online. + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws NoServerForRegionException if root region is not being served + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence + * and attempt-at-creation). + * @throws IOException + */ + public void createTableAsync(HTableDescriptor desc) + throws IOException { + + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(desc.getName()); + try { + this.master.createTable(desc); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Deletes a table + * + * @param tableName name of table to delete + * @throws IOException + */ + public void deleteTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.deleteTable(tableName); + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is deleted + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for (int tries = 0; tries < numRetries; tries++) { + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + break; + } + boolean found = false; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + if (info.tableDesc.getName().equals(tableName)) { + found = true; + } + } + } + if (!found) { + break; + } + + } catch (IOException ex) { + if(tries == numRetries - 1) { // no more tries left + if (ex instanceof RemoteException) { + ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); + } + throw ex; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + } catch (Exception ex) { + LOG.warn(ex); + } + } + } + + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + LOG.info("table " + tableName + " deleted"); + } + + /** + * Brings a table on-line (enables it) + * + * @param tableName name of the table + * @throws IOException + */ + public void enableTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.enableTable(tableName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is enabled + + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for (int tries = 0; tries < numRetries; tries++) { + int valuesfound = 0; + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + boolean isenabled = false; + + while (true) { + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + if (valuesfound == 0) { + throw new NoSuchElementException( + "table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + isenabled = !info.offLine; + break; + } + } + if (isenabled) { + break; + } + } + if (isenabled) { + break; + } + + } catch (IOException e) { + if (tries == numRetries - 1) { // no more retries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + + } catch (Exception e) { + LOG.warn(e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sleep. Waiting for first region to be enabled from " + + tableName); + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Waiting for first region to be enabled from " + + tableName); + } + } + LOG.info("Enabled table " + tableName); + } + + /** + * Disables a table (takes it off-line) If it is being served, the master + * will tell the servers to stop serving it. + * + * @param tableName name of table + * @throws IOException + */ + public void disableTable(Text tableName) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); + + try { + this.master.disableTable(tableName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + // Wait until first region is disabled + + HRegionInterface server = + connection.getHRegionConnection(firstMetaServer.getServerAddress()); + + DataInputBuffer inbuf = new DataInputBuffer(); + HRegionInfo info = new HRegionInfo(); + for(int tries = 0; tries < numRetries; tries++) { + int valuesfound = 0; + long scannerId = -1L; + try { + scannerId = + server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), + COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); + + boolean disabled = false; + while (true) { + KeyedData[] values = server.next(scannerId); + if (values == null || values.length == 0) { + if (valuesfound == 0) { + throw new NoSuchElementException("table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for (int j = 0; j < values.length; j++) { + if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[j].getData(), values[j].getData().length); + info.readFields(inbuf); + disabled = info.offLine; + break; + } + } + if (disabled) { + break; + } + } + if (disabled) { + break; + } + + } catch (IOException e) { + if (tries == numRetries - 1) { // no more retries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + + } catch (Exception e) { + LOG.warn(e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sleep. Waiting for first region to be disabled from " + + tableName); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Waiting for first region to be disabled from " + + tableName); + } + } + LOG.info("Disabled table " + tableName); + } + + /** + * @param tableName Table to check. + * @return True if table exists already. + * @throws MasterNotRunningException + */ + public boolean tableExists(final Text tableName) throws MasterNotRunningException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + return connection.tableExists(tableName); + } + + /** + * Add a column to an existing table + * + * @param tableName name of the table to add column to + * @param column column descriptor of column to be added + * @throws IOException + */ + public void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + try { + this.master.addColumn(tableName, column); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Delete a column from a table + * + * @param tableName name of table + * @param columnName name of column to be deleted + * @throws IOException + */ + public void deleteColumn(Text tableName, Text columnName) + throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + checkReservedTableName(tableName); + try { + this.master.deleteColumn(tableName, columnName); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Shuts down the HBase instance + * @throws IOException + */ + public synchronized void shutdown() throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } + + try { + this.master.shutdown(); + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } finally { + this.master = null; + } + } + + /* + * Verifies that the specified table name is not a reserved name + * @param tableName - the table name to be checked + * @throws IllegalArgumentException - if the table name is reserved + */ + protected void checkReservedTableName(Text tableName) { + if(tableName.equals(ROOT_TABLE_NAME) + || tableName.equals(META_TABLE_NAME)) { + + throw new IllegalArgumentException(tableName + " is a reserved table name"); + } + } + + private HRegionLocation getFirstMetaServerForTable(Text tableName) + throws IOException { + SortedMap metaservers = + connection.getTableServers(META_TABLE_NAME); + + return metaservers.get((metaservers.containsKey(tableName)) ? + tableName : metaservers.headMap(tableName).lastKey()); + } + + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 560014) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -17,500 +17,40 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +// temporary until I change all the classes that depend on HClient. package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Random; import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; /** - * HClient manages a connection to a single HRegionServer. + * The HClient class is deprecated and is now implemented entirely in terms of + * the classes that replace it: + * + *

+ * HClient continues to be supported in the short term to give users a chance + * to migrate to the use of HConnection, HTable and HBaseAdmin. Any new API + * features which are added will be added to these three classes and will not + * be supported in HClient. */ +@Deprecated public class HClient implements HConstants { - final Log LOG = LogFactory.getLog(this.getClass().getName()); - - static final Text[] META_COLUMNS = { - COLUMN_FAMILY - }; - - private static final Text[] REGIONINFO = { - COL_REGIONINFO - }; - - static final Text EMPTY_START_ROW = new Text(); - - long pause; - int numRetries; - HMasterInterface master; private final Configuration conf; - private AtomicLong currentLockId; - private Class serverInterfaceClass; - private AtomicReference batch; - - /* - * Data structure that holds current location for a region and its info. - */ - @SuppressWarnings("unchecked") - protected static class RegionLocation implements Comparable { - HRegionInfo regionInfo; - HServerAddress serverAddress; - - RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { - this.regionInfo = regionInfo; - this.serverAddress = serverAddress; - } - - /** - * {@inheritDoc} - */ - @Override - public String toString() { - return "address: " + this.serverAddress.toString() + ", regioninfo: " + - this.regionInfo; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean equals(Object o) { - return this.compareTo(o) == 0; - } - - /** - * {@inheritDoc} - */ - @Override - public int hashCode() { - int result = this.regionInfo.hashCode(); - result ^= this.serverAddress.hashCode(); - return result; - } - - /** @return HRegionInfo */ - public HRegionInfo getRegionInfo(){ - return regionInfo; - } - - /** @return HServerAddress */ - public HServerAddress getServerAddress(){ - return serverAddress; - } - - // - // Comparable - // - - /** - * {@inheritDoc} - */ - public int compareTo(Object o) { - RegionLocation other = (RegionLocation) o; - int result = this.regionInfo.compareTo(other.regionInfo); - if(result == 0) { - result = this.serverAddress.compareTo(other.serverAddress); - } - return result; - } - } - - /** encapsulates finding the servers for a table */ - protected class TableServers { - // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) - private TreeMap> tablesToServers; - - /** constructor */ - public TableServers() { - this.tablesToServers = - new TreeMap>(); - } - - /** - * Gets the servers of the given table out of cache, or calls - * findServersForTable if there is nothing in the cache. - * - * @param tableName - the table to be located - * @return map of startRow -> RegionLocation - * @throws IOException - if the table can not be located after retrying - */ - public synchronized SortedMap - getTableServers(Text tableName) throws IOException { - if(tableName == null || tableName.getLength() == 0) { - throw new IllegalArgumentException( - "table name cannot be null or zero length"); - } - SortedMap serverResult = - tablesToServers.get(tableName); - - if (serverResult == null ) { - if (LOG.isDebugEnabled()) { - LOG.debug("No servers for " + tableName + ". Doing a find..."); - } - // We don't know where the table is. - // Load the information from meta. - serverResult = findServersForTable(tableName); - } - return serverResult; - } - - /* - * Clears the cache of all known information about the specified table and - * locates a table by searching the META or ROOT region (as appropriate) or - * by querying the master for the location of the root region if that is the - * table requested. - * - * @param tableName - name of table to find servers for - * @return - map of first row to table info for all regions in the table - * @throws IOException - */ - private SortedMap findServersForTable(Text tableName) - throws IOException { - - // Wipe out everything we know about this table - - if (this.tablesToServers.containsKey(tableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Wiping out all we know of " + tableName); - } - this.tablesToServers.remove(tableName); - } - - SortedMap servers = null; - - if (tableName.equals(ROOT_TABLE_NAME)) { - servers = locateRootRegion(); - - } else if (tableName.equals(META_TABLE_NAME)) { - if (tablesToServers.get(ROOT_TABLE_NAME) == null) { - findServersForTable(ROOT_TABLE_NAME); - } - for (int tries = 0; tries < numRetries; tries++) { - try { - servers = loadMetaFromRoot(); - break; - - } catch (IOException e) { - if (tries < numRetries - 1) { - findServersForTable(ROOT_TABLE_NAME); - continue; - } - throw e; - } - } - } else { - for (int tries = 0; tries < numRetries; tries++) { - boolean success = true; // assume this works - - SortedMap metaServers = - this.tablesToServers.get(META_TABLE_NAME); - if (metaServers == null) { - metaServers = findServersForTable(META_TABLE_NAME); - } - Text firstMetaRegion = metaServers.headMap(tableName).lastKey(); - metaServers = metaServers.tailMap(firstMetaRegion); - - servers = new TreeMap(); - for (RegionLocation t: metaServers.values()) { - try { - servers.putAll(scanOneMetaRegion(t, tableName)); - - } catch (IOException e) { - e.printStackTrace(); - if(tries < numRetries - 1) { - findServersForTable(META_TABLE_NAME); - success = false; - break; - } - throw e; - } - } - if (success) { - break; - } - } - } - this.tablesToServers.put(tableName, servers); - if (LOG.isDebugEnabled()) { - if(servers != null) { - for (Map.Entry e: servers.entrySet()) { - LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + - " for table " + tableName); - } - } - } - return servers; - } - - /* - * Load the meta table from the root table. - * - * @return map of first row to TableInfo for all meta regions - * @throws IOException - */ - private TreeMap loadMetaFromRoot() throws IOException { - SortedMap rootRegion = - this.tablesToServers.get(ROOT_TABLE_NAME); - return scanOneMetaRegion(rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME); - } - - /* - * Repeatedly try to find the root region by asking the master for where it is - * @return TreeMap for root regin if found - * @throws NoServerForRegionException - if the root region can not be located - * after retrying - * @throws IOException - */ - private TreeMap locateRootRegion() throws IOException { - checkMaster(); - - HServerAddress rootRegionLocation = null; - for(int tries = 0; tries < numRetries; tries++) { - int localTimeouts = 0; - while(rootRegionLocation == null && localTimeouts < numRetries) { - rootRegionLocation = master.findRootRegion(); - if(rootRegionLocation == null) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Waiting for root region."); - } - Thread.sleep(pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - localTimeouts++; - } - } - - if(rootRegionLocation == null) { - throw new NoServerForRegionException( - "Timed out trying to locate root region"); - } - - HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); - - try { - rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); - break; - } catch(IOException e) { - if(tries == numRetries - 1) { - // Don't bother sleeping. We've run out of retries. - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - // Sleep and retry finding root region. - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Root region location changed. Sleeping."); - } - Thread.sleep(pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - } - rootRegionLocation = null; - } - - if (rootRegionLocation == null) { - throw new NoServerForRegionException( - "unable to locate root region server"); - } - - TreeMap rootServer = new TreeMap(); - rootServer.put(EMPTY_START_ROW, - new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); - - return rootServer; - } - - /* - * Scans a single meta region - * @param t the meta region we're going to scan - * @param tableName the name of the table we're looking for - * @return returns a map of startingRow to TableInfo - * @throws RegionNotFoundException - if table does not exist - * @throws IllegalStateException - if table is offline - * @throws NoServerForRegionException - if table can not be found after retrying - * @throws IOException - */ - private TreeMap scanOneMetaRegion(final RegionLocation t, - final Text tableName) throws IOException { - HRegionInterface server = getHRegionConnection(t.serverAddress); - TreeMap servers = new TreeMap(); - for(int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { - - long scannerId = -1L; - try { - scannerId = - server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName, - System.currentTimeMillis(), null); - - DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { - HRegionInfo regionInfo = null; - String serverAddress = null; - KeyedData[] values = server.next(scannerId); - if(values.length == 0) { - if(servers.size() == 0) { - // If we didn't find any servers then the table does not exist - throw new TableNotFoundException("table '" + tableName + - "' does not exist in " + t); - } - - // We found at least one server for the table and now we're done. - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " server(s) for " + - "location: " + t + " for tablename " + tableName); - } - break; - } - - byte[] bytes = null; - TreeMap results = new TreeMap(); - for(int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); - } - regionInfo = new HRegionInfo(); - bytes = results.get(COL_REGIONINFO); - inbuf.reset(bytes, bytes.length); - regionInfo.readFields(inbuf); - - if(!regionInfo.tableDesc.getName().equals(tableName)) { - // We're done - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " servers for table " + - tableName); - } - break; - } - - if(regionInfo.offLine) { - throw new IllegalStateException("table offline: " + tableName); - } - - bytes = results.get(COL_SERVER); - if(bytes == null || bytes.length == 0) { - // We need to rescan because the table we want is unassigned. - if(LOG.isDebugEnabled()) { - LOG.debug("no server address for " + regionInfo.toString()); - } - servers.clear(); - break; - } - serverAddress = new String(bytes, UTF8_ENCODING); - servers.put(regionInfo.startKey, - new RegionLocation(regionInfo, new HServerAddress(serverAddress))); - } - } catch (IOException e) { - if(tries == numRetries - 1) { // no retries left - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - } catch(Exception ex) { - LOG.warn(ex); - } - } - } - - if(servers.size() == 0 && tries == numRetries - 1) { - throw new NoServerForRegionException("failed to find server for " - + tableName + " after " + numRetries + " retries"); - } - - if (servers.size() <= 0) { - // The table is not yet being served. Sleep and retry. - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Table " + tableName + - " not currently being served."); - } - try { - Thread.sleep(pause); - } catch (InterruptedException ie) { - // continue - } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding table " + tableName); - } - } - } - return servers; - } - - /** - * Reloads servers for the specified table. - * @param tableName name of table whose servers are to be reloaded - * @return map of start key -> RegionLocation - * @throws IOException - */ - public synchronized SortedMap - reloadTableServers(final Text tableName) - throws IOException { - // Reload information for the whole table - SortedMap servers = findServersForTable(tableName); - - if (LOG.isDebugEnabled()) { - LOG.debug("Result of findTable: " + servers.toString()); - } - - if (tablesToServers.get(tableName) == null) { - throw new TableNotFoundException(tableName.toString()); - } - - return servers; - } - - } - - protected TableServers tableServers; - - // For the "current" table: Map startRow -> RegionLocation - SortedMap currentTableServers; - - // Known region HServerAddress.toString() -> HRegionInterface - private TreeMap servers; - - // For row mutation operations - - Text currentRegion; - HRegionInterface currentServer; - Random rand; - long clientid; - + protected AtomicReference connection; + protected AtomicReference admin; + protected AtomicReference table; /** * Creates a new HClient @@ -518,94 +58,33 @@ */ public HClient(Configuration conf) { this.conf = conf; - this.batch = new AtomicReference(); - this.currentLockId = new AtomicLong(-1L); - - this.pause = conf.getLong("hbase.client.pause", 30 * 1000); - this.numRetries = conf.getInt("hbase.client.retries.number", 5); - - this.master = null; - this.tableServers = new TableServers(); - this.currentTableServers = null; - this.servers = new TreeMap(); - - // For row mutation operations - - this.currentRegion = null; - this.currentServer = null; - this.rand = new Random(); + this.connection = new AtomicReference(); + this.admin = new AtomicReference(); + this.table = new AtomicReference(); } - /* Find the address of the master and connect to it */ - protected void checkMaster() throws MasterNotRunningException { - if (this.master != null) { - return; - } - - for(int tries = 0; this.master == null && tries < numRetries; tries++) { - HServerAddress masterLocation = - new HServerAddress(this.conf.get(MASTER_ADDRESS, - DEFAULT_MASTER_ADDRESS)); - - try { - HMasterInterface tryMaster = - (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, masterLocation.getInetSocketAddress(), - this.conf); - if(tryMaster.isMasterRunning()) { - this.master = tryMaster; - break; - } - } catch(IOException e) { - if(tries == numRetries - 1) { - // This was our last chance - don't bother sleeping - break; - } - LOG.info("Attempt " + tries + " of " + this.numRetries + - " failed with <" + e + ">. Retrying after sleep of " + this.pause); - } - - // We either cannot connect to master or it is not running. Sleep & retry - try { - Thread.sleep(this.pause); - } catch(InterruptedException e) { - // continue - } - } - - if(this.master == null) { - throw new MasterNotRunningException(); + /* Lazily creates a HConnection */ + private synchronized HConnection getHConnection() { + HConnection conn = connection.get(); + if (conn == null) { + conn = HConnectionManager.getConnection(conf); + connection.set(conn); } + return conn; } - - /** - * @return - true if the master server is running - */ - public boolean isMasterRunning() { - if(this.master == null) { - try { - checkMaster(); - - } catch(MasterNotRunningException e) { - return false; - } + + /* Lazily creates a HBaseAdmin */ + private synchronized HBaseAdmin getHBaseAdmin() throws MasterNotRunningException { + getHConnection(); // ensure we have a connection + HBaseAdmin adm = admin.get(); + if (adm == null) { + adm = new HBaseAdmin(conf); + admin.set(adm); } - return true; + return adm; } /** - * Reloads the cached server information for the current table - * - * @param info RegionInfo for a region that is a part of the table - * @throws IOException - */ - protected synchronized void reloadCurrentTable(RegionLocation info) - throws IOException { - this.currentTableServers = tableServers.reloadTableServers( - info.getRegionInfo().getTableDesc().getName()); - } - - /** * Find region location hosting passed row using cached info * @param row Row to find. * @return Location of row. @@ -610,17 +89,13 @@ * @param row Row to find. * @return Location of row. */ - protected synchronized RegionLocation getRegionLocation(Text row) { - if(this.currentTableServers == null) { + protected HRegionLocation getRegionLocation(Text row) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - - // Only one server will have the row we are looking for - Text serverKey = (this.currentTableServers.containsKey(row))? row: - this.currentTableServers.headMap(row).lastKey(); - return this.currentTableServers.get(serverKey); + return table.get().getRegionLocation(row); } - + /** * Establishes a connection to the region server at the specified address. * @param regionServer - the server to connect to @@ -626,44 +101,16 @@ * @param regionServer - the server to connect to * @throws IOException */ - protected synchronized HRegionInterface getHRegionConnection ( + protected HRegionInterface getHRegionConnection( HServerAddress regionServer) throws IOException { - - getRegionServerInterface(); - - // See if we already have a connection - HRegionInterface server = this.servers.get(regionServer.toString()); - - if (server == null) { // Get a connection - long versionId = 0; - try { - versionId = - serverInterfaceClass.getDeclaredField("versionID").getLong(server); - - } catch (IllegalAccessException e) { - // Should never happen unless visibility of versionID changes - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - - } catch (NoSuchFieldException e) { - // Should never happen unless versionID field name changes in HRegionInterface - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - } - - try { - server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, - versionId, regionServer.getInetSocketAddress(), this.conf); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - this.servers.put(regionServer.toString(), server); - } - return server; + return getHConnection().getHRegionConnection(regionServer); + } + + /** + * @return - true if the master server is running + */ + public boolean isMasterRunning() { + return getHConnection().isMasterRunning(); } // @@ -683,12 +130,10 @@ * and attempt-at-creation). * @throws IOException */ - public synchronized void createTable(HTableDescriptor desc) + public void createTable(HTableDescriptor desc) throws IOException { - createTableAsync(desc); - - // Wait for new table to come on-line - tableServers.getTableServers(desc.getName()); + + getHBaseAdmin().createTable(desc); } /** @@ -704,15 +149,10 @@ * and attempt-at-creation). * @throws IOException */ - public synchronized void createTableAsync(HTableDescriptor desc) + public void createTableAsync(HTableDescriptor desc) throws IOException { - checkReservedTableName(desc.getName()); - checkMaster(); - try { - this.master.createTable(desc); - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + + getHBaseAdmin().createTableAsync(desc); } /** @@ -721,70 +161,8 @@ * @param tableName name of table to delete * @throws IOException */ - public synchronized void deleteTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.deleteTable(tableName); - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is deleted - HRegionInterface server = - getHRegionConnection(firstMetaServer.serverAddress); - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for (int tries = 0; tries < numRetries; tries++) { - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - break; - } - boolean found = false; - for(int j = 0; j < values.length; j++) { - if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - if(info.tableDesc.getName().equals(tableName)) { - found = true; - } - } - } - if(!found) { - break; - } - - } catch (IOException ex) { - if(tries == numRetries - 1) { // no more tries left - if(ex instanceof RemoteException) { - ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); - } - throw ex; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - } catch(Exception ex) { - LOG.warn(ex); - } - } - } - - try { - Thread.sleep(pause); - } catch(InterruptedException e) { - // continue - } - } - LOG.info("table " + tableName + " deleted"); + public void deleteTable(Text tableName) throws IOException { + getHBaseAdmin().deleteTable(tableName); } /** @@ -793,88 +171,8 @@ * @param tableName name of the table * @throws IOException */ - public synchronized void enableTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.enableTable(tableName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is enabled - - HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); - - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for(int tries = 0; tries < numRetries; tries++) { - int valuesfound = 0; - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - boolean isenabled = false; - while(true) { - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - break; - } - valuesfound += 1; - for(int j = 0; j < values.length; j++) { - if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - isenabled = !info.offLine; - break; - } - } - if(isenabled) { - break; - } - } - if(isenabled) { - break; - } - - } catch (IOException e) { - if(tries == numRetries - 1) { // no more retries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - - } catch(Exception e) { - LOG.warn(e); - } - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be enabled from " + tableName); - } - try { - Thread.sleep(pause); - - } catch(InterruptedException e) { - // continue - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be enabled from " + tableName); - } - } - LOG.info("Enabled table " + tableName); + public void enableTable(Text tableName) throws IOException { + getHBaseAdmin().enableTable(tableName); } /** @@ -884,87 +182,8 @@ * @param tableName name of table * @throws IOException */ - public synchronized void disableTable(Text tableName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); - - try { - this.master.disableTable(tableName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - // Wait until first region is disabled - - HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); - - DataInputBuffer inbuf = new DataInputBuffer(); - HRegionInfo info = new HRegionInfo(); - for(int tries = 0; tries < numRetries; tries++) { - int valuesfound = 0; - long scannerId = -1L; - try { - scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName, System.currentTimeMillis(), null); - boolean disabled = false; - while(true) { - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - break; - } - valuesfound += 1; - for(int j = 0; j < values.length; j++) { - if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); - disabled = info.offLine; - break; - } - } - if(disabled) { - break; - } - } - if(disabled) { - break; - } - - } catch(IOException e) { - if(tries == numRetries - 1) { // no more retries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - - } catch(Exception e) { - LOG.warn(e); - } - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName); - } - try { - Thread.sleep(pause); - } catch(InterruptedException e) { - // continue - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be disabled from " + tableName); - } - } - LOG.info("Disabled table " + tableName); + public void disableTable(Text tableName) throws IOException { + getHBaseAdmin().disableTable(tableName); } /** @@ -970,18 +189,10 @@ /** * @param tableName Table to check. * @return True if table exists already. - * @throws IOException + * @throws MasterNotRunningException */ - public boolean tableExists(final Text tableName) throws IOException { - HTableDescriptor [] tables = listTables(); - boolean result = false; - for (int i = 0; i < tables.length; i++) { - if (tables[i].getName().equals(tableName)) { - result = true; - break; - } - } - return result; + public boolean tableExists(final Text tableName) throws MasterNotRunningException { + return getHBaseAdmin().tableExists(tableName); } /** @@ -991,16 +202,9 @@ * @param column column descriptor of column to be added * @throws IOException */ - public synchronized void addColumn(Text tableName, HColumnDescriptor column) + public void addColumn(Text tableName, HColumnDescriptor column) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.addColumn(tableName, column); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + getHBaseAdmin().addColumn(tableName, column); } /** @@ -1010,16 +214,9 @@ * @param columnName name of column to be deleted * @throws IOException */ - public synchronized void deleteColumn(Text tableName, Text columnName) + public void deleteColumn(Text tableName, Text columnName) throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.deleteColumn(tableName, columnName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + getHBaseAdmin().deleteColumn(tableName, columnName); } /** @@ -1026,37 +223,10 @@ * Shuts down the HBase instance * @throws IOException */ - public synchronized void shutdown() throws IOException { - checkMaster(); - try { - this.master.shutdown(); - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } + public void shutdown() throws IOException { + getHBaseAdmin().shutdown(); } - /* - * Verifies that the specified table name is not a reserved name - * @param tableName - the table name to be checked - * @throws IllegalArgumentException - if the table name is reserved - */ - protected void checkReservedTableName(Text tableName) { - if(tableName.equals(ROOT_TABLE_NAME) - || tableName.equals(META_TABLE_NAME)) { - - throw new IllegalArgumentException(tableName + " is a reserved table name"); - } - } - - private RegionLocation getFirstMetaServerForTable(Text tableName) - throws IOException { - SortedMap metaservers = - tableServers.getTableServers(META_TABLE_NAME); - - return metaservers.get((metaservers.containsKey(tableName)) ? - tableName : metaservers.headMap(tableName).lastKey()); - } - ////////////////////////////////////////////////////////////////////////////// // Client API ////////////////////////////////////////////////////////////////////////////// @@ -1068,13 +238,11 @@ * @throws IOException if the table can not be located after retrying */ public synchronized void openTable(Text tableName) throws IOException { - if(tableName == null || tableName.getLength() == 0) { - throw new IllegalArgumentException("table name cannot be null or zero length"); - } - if(this.currentLockId.get() != -1L || batch.get() != null) { - throw new IllegalStateException("update in progress"); + HTable table = this.table.get(); + if (table != null) { + table.checkUpdateInProgress(); } - this.currentTableServers = tableServers.getTableServers(tableName); + this.table.set(new HTable(conf, tableName)); } /** @@ -1081,17 +249,11 @@ * Gets the starting row key for every region in the currently open table * @return Array of region starting row keys */ - public synchronized Text[] getStartKeys() { - if(this.currentTableServers == null) { + public Text[] getStartKeys() { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - - Text[] keys = new Text[currentTableServers.size()]; - int i = 0; - for(Text key: currentTableServers.keySet()){ - keys[i++] = key; - } - return keys; + return table.get().getStartKeys(); } /** @@ -1104,49 +266,8 @@ * @return - returns an array of HTableDescriptors * @throws IOException */ - public synchronized HTableDescriptor[] listTables() - throws IOException { - TreeSet uniqueTables = new TreeSet(); - - SortedMap metaTables = - tableServers.getTableServers(META_TABLE_NAME); - - for (RegionLocation t: metaTables.values()) { - HRegionInterface server = getHRegionConnection(t.serverAddress); - long scannerId = -1L; - try { - scannerId = server.openScanner(t.regionInfo.regionName, - META_COLUMNS, EMPTY_START_ROW, System.currentTimeMillis(), null); - - DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { - KeyedData[] values = server.next(scannerId); - if(values.length == 0) { - break; - } - for(int i = 0; i < values.length; i++) { - if(values[i].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[i].getData(), values[i].getData().length); - HRegionInfo info = new HRegionInfo(); - info.readFields(inbuf); - - // Only examine the rows where the startKey is zero length - if(info.startKey.getLength() == 0) { - uniqueTables.add(info.tableDesc); - } - } - } - } - } catch (RemoteException ex) { - throw RemoteExceptionHandler.decodeRemoteException(ex); - - } finally { - if(scannerId != -1L) { - server.close(scannerId); - } - } - } - return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); + public HTableDescriptor[] listTables() throws IOException { + return getHConnection().listTables(); } /** @@ -1158,31 +279,10 @@ * @throws IOException */ public byte[] get(Text row, Text column) throws IOException { - byte [] value = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - try { - value = server.get(info.regionInfo.regionName, row, column); - break; - - } catch (IOException e) { - if (tries == numRetries - 1) { - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return value; + return this.table.get().get(row, column); } /** @@ -1194,42 +294,12 @@ * @return - array byte values * @throws IOException */ - public byte[][] get(Text row, Text column, int numVersions) throws IOException { - byte [][] values = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - - try { - values = server.get(info.regionInfo.regionName, row, column, numVersions); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } - } - - if(values != null) { - ArrayList bytes = new ArrayList(); - for(int i = 0 ; i < values.length; i++) { - bytes.add(values[i]); - } - return bytes.toArray(new byte[values.length][]); + public byte[][] get(Text row, Text column, int numVersions) + throws IOException { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return null; + return this.table.get().get(row, column, numVersions); } /** @@ -1245,40 +315,10 @@ */ public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException { - byte [][] values = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - try { - values = server.get(info.regionInfo.regionName, row, column, timestamp, numVersions); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } - } - - if(values != null) { - ArrayList bytes = new ArrayList(); - for(int i = 0 ; i < values.length; i++) { - bytes.add(values[i]); - } - return bytes.toArray(new byte[values.length][]); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return null; + return this.table.get().get(row, column, timestamp, numVersions); } /** @@ -1289,39 +329,10 @@ * @throws IOException */ public SortedMap getRow(Text row) throws IOException { - KeyedData[] value = null; - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = getRegionLocation(row); - HRegionInterface server = getHRegionConnection(info.serverAddress); - - try { - value = server.getRow(info.regionInfo.regionName, row); - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - } - try { - Thread.sleep(this.pause); - - } catch (InterruptedException x) { - // continue - } - } - TreeMap results = new TreeMap(); - if(value != null && value.length != 0) { - for(int i = 0; i < value.length; i++) { - results.put(value[i].getKey().getColumn(), value[i].getData()); - } + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return results; + return this.table.get().getRow(row); } /** @@ -1333,7 +344,7 @@ * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { return obtainScanner(columns, startRow, System.currentTimeMillis(), null); } @@ -1348,7 +359,7 @@ * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, long timestamp) throws IOException { return obtainScanner(columns, startRow, timestamp, null); } @@ -1363,7 +374,7 @@ * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, RowFilterInterface filter) throws IOException { return obtainScanner(columns, startRow, System.currentTimeMillis(), filter); } @@ -1379,13 +390,13 @@ * @return scanner * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, + public HScannerInterface obtainScanner(Text[] columns, Text startRow, long timestamp, RowFilterInterface filter) throws IOException { - if(this.currentTableServers == null) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - return new ClientScanner(columns, startRow, timestamp, filter); + return this.table.get().obtainScanner(columns, startRow, timestamp, filter); } /** @@ -1397,15 +408,11 @@ * @param row name of row to be updated * @return lockid to be used in subsequent put, delete and commit calls */ - public synchronized long startBatchUpdate(final Text row) { - if (this.currentTableServers == null) { + public long startBatchUpdate(final Text row) { + if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - if (batch.get() != null) { - throw new IllegalStateException("batch update in progress"); - } - batch.set(new BatchUpdate()); - return batch.get().startUpdate(row); + return this.table.get().startBatchUpdate(row); } /** @@ -1412,15 +419,11 @@ * Abort a batch mutation * @param lockid lock id returned by startBatchUpdate */ - public synchronized void abortBatch(final long lockid) { - BatchUpdate u = batch.get(); - if (u == null) { - throw new IllegalStateException("no batch update in progress"); - } - if (u.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); + public void abortBatch(final long lockid) { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - batch.set(null); + this.table.get().abortBatch(lockid); } /** @@ -1440,44 +443,12 @@ * @param timestamp time to associate with all the changes * @throws IOException */ - public synchronized void commitBatch(final long lockid, final long timestamp) + public void commitBatch(final long lockid, final long timestamp) throws IOException { - BatchUpdate u = batch.get(); - if (u == null) { - throw new IllegalStateException("no batch update in progress"); - } - if (u.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - - try { - for (int tries = 0; tries < numRetries; tries++) { - RegionLocation r = getRegionLocation(u.getRow()); - HRegionInterface server = getHRegionConnection(r.serverAddress); - try { - server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u); - break; - - } catch (IOException e) { - if (tries < numRetries -1) { - reloadCurrentTable(r); - - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - try { - Thread.sleep(pause); - - } catch (InterruptedException e) { - } - } - } finally { - batch.set(null); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().commitBatch(lockid, timestamp); } /** @@ -1495,47 +466,11 @@ * @return Row lockid. * @throws IOException */ - public synchronized long startUpdate(final Text row) throws IOException { - if (this.currentLockId.get() != -1L) { - throw new IllegalStateException("update in progress"); - } - if (batch.get() != null) { - throw new IllegalStateException("batch update in progress"); - } - for (int tries = 0; tries < numRetries; tries++) { - IOException e = null; - RegionLocation info = getRegionLocation(row); - try { - currentServer = getHRegionConnection(info.serverAddress); - currentRegion = info.regionInfo.regionName; - clientid = rand.nextLong(); - this.currentLockId.set( - currentServer.startUpdate(currentRegion, clientid, row)); - break; - - } catch (IOException ex) { - e = ex; - } - if (tries < numRetries - 1) { - try { - Thread.sleep(this.pause); - - } catch (InterruptedException ex) { - } - try { - reloadCurrentTable(info); - - } catch (IOException ex) { - e = ex; - } - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } + public long startUpdate(final Text row) throws IOException { + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } - return this.currentLockId.get(); + return this.table.get().startUpdate(row); } /** @@ -1548,33 +483,10 @@ * @throws IOException */ public void put(long lockid, Text column, byte val[]) throws IOException { - if (val == null) { - throw new IllegalArgumentException("value cannot be null"); - } - if (batch.get() != null) { - batch.get().put(lockid, column, val); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.put(this.currentRegion, this.clientid, lockid, column, - val); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().put(lockid, column, val); } /** @@ -1585,30 +497,10 @@ * @throws IOException */ public void delete(long lockid, Text column) throws IOException { - if (batch.get() != null) { - batch.get().delete(lockid, column); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.delete(this.currentRegion, this.clientid, lockid, - column); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().delete(lockid, column); } /** @@ -1618,26 +510,10 @@ * @throws IOException */ public void abort(long lockid) throws IOException { - if (batch.get() != null) { - abortBatch(lockid); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } finally { - this.currentLockId.set(-1L); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().abort(lockid); } /** @@ -1658,28 +534,10 @@ * @throws IOException */ public void commit(long lockid, long timestamp) throws IOException { - if (batch.get() != null) { - commitBatch(lockid, timestamp); - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.commit(this.currentRegion, this.clientid, lockid, - timestamp); - - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } finally { - this.currentLockId.set(-1L); + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().commit(lockid, timestamp); } /** @@ -1689,175 +547,12 @@ * @throws IOException */ public void renewLease(long lockid) throws IOException { - if (batch.get() != null) { - return; - } - - if (lockid != this.currentLockId.get()) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.renewLease(lockid, this.clientid); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; + if(this.table.get() == null) { + throw new IllegalStateException("Must open table first"); } + this.table.get().renewLease(lockid); } - /** - * Implements the scanner interface for the HBase client. - * If there are multiple regions in a table, this scanner will iterate - * through them all. - */ - private class ClientScanner implements HScannerInterface { - private final Text EMPTY_COLUMN = new Text(); - private Text[] columns; - private Text startRow; - private long scanTime; - private boolean closed; - private AtomicReferenceArray regions; - @SuppressWarnings("hiding") - private int currentRegion; - private HRegionInterface server; - private long scannerId; - private RowFilterInterface filter; - - private void loadRegions() { - Text firstServer = null; - if(this.startRow == null || this.startRow.getLength() == 0) { - firstServer = currentTableServers.firstKey(); - - } else if(currentTableServers.containsKey(startRow)) { - firstServer = startRow; - - } else { - firstServer = currentTableServers.headMap(startRow).lastKey(); - } - Collection info = - currentTableServers.tailMap(firstServer).values(); - - this.regions = new AtomicReferenceArray( - info.toArray(new RegionLocation[info.size()])); - } - - ClientScanner(Text[] columns, Text startRow, long timestamp, - RowFilterInterface filter) throws IOException { - this.columns = columns; - this.startRow = startRow; - this.scanTime = timestamp; - this.closed = false; - this.filter = filter; - if (filter != null) { - filter.validate(columns); - } - loadRegions(); - this.currentRegion = -1; - this.server = null; - this.scannerId = -1L; - nextScanner(); - } - - /* - * Gets a scanner for the next region. - * Returns false if there are no more scanners. - */ - private boolean nextScanner() throws IOException { - if(this.scannerId != -1L) { - this.server.close(this.scannerId); - this.scannerId = -1L; - } - this.currentRegion += 1; - if(this.currentRegion == this.regions.length()) { - close(); - return false; - } - try { - for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = this.regions.get(currentRegion); - this.server = getHRegionConnection(info.serverAddress); - - try { - if (this.filter == null) { - this.scannerId = this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW, scanTime, null); - } else { - this.scannerId = - this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW, scanTime, filter); - } - - break; - - } catch(IOException e) { - if(tries == numRetries - 1) { - // No more tries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - reloadCurrentTable(info); - loadRegions(); - } - } - - } catch(IOException e) { - close(); - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - return true; - } - - /** - * {@inheritDoc} - */ - public boolean next(HStoreKey key, TreeMap results) throws IOException { - if(this.closed) { - return false; - } - KeyedData[] values = null; - do { - values = this.server.next(this.scannerId); - } while(values != null && values.length == 0 && nextScanner()); - - if(values != null && values.length != 0) { - for(int i = 0; i < values.length; i++) { - key.setRow(values[i].getKey().getRow()); - key.setVersion(values[i].getKey().getTimestamp()); - key.setColumn(EMPTY_COLUMN); - results.put(values[i].getKey().getColumn(), values[i].getData()); - } - } - return values == null ? false : values.length != 0; - } - - /** - * {@inheritDoc} - */ - public void close() throws IOException { - if(this.scannerId != -1L) { - this.server.close(this.scannerId); - this.scannerId = -1L; - } - this.server = null; - this.closed = true; - } - } - private void printUsage() { printUsage(null); } @@ -1994,28 +689,6 @@ } /** - * Determine the region server interface to use from configuration properties. - * - */ - @SuppressWarnings("unchecked") - private void getRegionServerInterface() { - if (this.serverInterfaceClass != null) { - return; - } - - String serverClassName = this.conf.get(REGION_SERVER_CLASS, - DEFAULT_REGION_SERVER_CLASS); - - try { - this.serverInterfaceClass = - (Class) Class.forName(serverClassName); - } catch (ClassNotFoundException e) { - throw new UnsupportedOperationException( - "Unable to find region server interface " + serverClassName, e); - } - } - - /** * @return the configuration for this client */ protected Configuration getConf(){ Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.SortedMap; + +import org.apache.hadoop.io.Text; + +/** + * + */ +public interface HConnection { + /** + * @return proxy connection to master server for this instance + * @throws MasterNotRunningException + */ + public HMasterInterface getMaster() throws MasterNotRunningException; + + /** @return - true if the master server is running */ + public boolean isMasterRunning(); + + /** + * @param tableName Table to check. + * @return True if table exists already. + */ + public boolean tableExists(final Text tableName); + + /** + * List all the userspace tables. In other words, scan the META table. + * + * If we wanted this to be really fast, we could implement a special + * catalog table that just contains table names and their descriptors. + * Right now, it only exists as part of the META table's region info. + * + * @return - returns an array of HTableDescriptors + * @throws IOException + */ + public HTableDescriptor[] listTables() throws IOException; + + /** + * Gets the servers of the given table. + * + * @param tableName - the table to be located + * @return map of startRow -> RegionLocation + * @throws IOException - if the table can not be located after retrying + */ + public SortedMap getTableServers(Text tableName) + throws IOException; + + /** + * Reloads servers for the specified table. + * + * @param tableName name of table whose servers are to be reloaded + * @return map of start key -> RegionLocation + * @throws IOException + */ + public SortedMap + reloadTableServers(final Text tableName) throws IOException; + + /** + * Establishes a connection to the region server at the specified address. + * @param regionServer - the server to connect to + * @return proxy for HRegionServer + * @throws IOException + */ + public HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException; + + /** + * Discard all the information about this table + * @param tableName the name of the table to close + */ + public void close(Text tableName); +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (revision 0) @@ -0,0 +1,761 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.hbase.io.KeyedData; + +/** + * A non-instantiable class that manages connections to multiple tables in + * multiple HBase instances + */ +public class HConnectionManager implements HConstants { + private HConnectionManager(){} // Not instantiable + + // A Map of master HServerAddress -> connection information for that instance + // Note that although the Map is synchronized, the objects it contains + // are mutable and hence require synchronized access to them + + private static final Map HBASE_INSTANCES = + Collections.synchronizedMap(new HashMap()); + + /** + * Get the connection object for the instance specified by the configuration + * If no current connection exists, create a new connection for that instance + * @param conf + * @return HConnection object for the instance specified by the configuration + */ + public static HConnection getConnection(Configuration conf) { + HConnection connection; + synchronized (HBASE_INSTANCES) { + String instanceName = conf.get(HBASE_DIR, DEFAULT_HBASE_DIR); + + connection = HBASE_INSTANCES.get(instanceName); + + if (connection == null) { + connection = new TableServers(conf); + HBASE_INSTANCES.put(instanceName, connection); + } + } + return connection; + } + + /** + * Delete connection information for the instance specified by the configuration + * @param conf + */ + public static void deleteConnection(Configuration conf) { + synchronized (HBASE_INSTANCES) { + HBASE_INSTANCES.remove(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); + } + } + + /* encapsulates finding the servers for an HBase instance */ + private static class TableServers implements HConnection, HConstants { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private final Class serverInterfaceClass; + private final long threadWakeFrequency; + private final long pause; + private final int numRetries; + + private final Integer masterLock = new Integer(0); + private volatile HMasterInterface master; + private volatile boolean masterChecked; + + private final Integer rootRegionLock = new Integer(0); + private final Integer metaRegionLock = new Integer(0); + + private volatile Configuration conf; + + // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) + private Map> tablesToServers; + + // Set of closed tables + private Set closedTables; + + // Set of tables currently being located + private HashSet tablesBeingLocated; + + // Known region HServerAddress.toString() -> HRegionInterface + private HashMap servers; + + /** constructor + * @param conf Configuration object + */ + @SuppressWarnings("unchecked") + public TableServers(Configuration conf) { + this.conf = conf; + + String serverClassName = + conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS); + + try { + this.serverInterfaceClass = + (Class) Class.forName(serverClassName); + + } catch (ClassNotFoundException e) { + throw new UnsupportedOperationException( + "Unable to find region server interface " + serverClassName, e); + } + + this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + + this.master = null; + this.masterChecked = false; + + this.tablesToServers = Collections.synchronizedMap( + new HashMap>()); + + this.closedTables = Collections.synchronizedSet(new HashSet()); + this.tablesBeingLocated = new HashSet(); + + this.servers = new HashMap(); + } + + /** {@inheritDoc} */ + public HMasterInterface getMaster() throws MasterNotRunningException { + synchronized (this.masterLock) { + for (int tries = 0; + !this.masterChecked && this.master == null && tries < numRetries; + tries++) { + + HServerAddress masterLocation = new HServerAddress(this.conf.get( + MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)); + + try { + HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy( + HMasterInterface.class, HMasterInterface.versionID, + masterLocation.getInetSocketAddress(), this.conf); + + if (tryMaster.isMasterRunning()) { + this.master = tryMaster; + break; + } + + } catch (IOException e) { + if(tries == numRetries - 1) { + // This was our last chance - don't bother sleeping + break; + } + LOG.info("Attempt " + tries + " of " + this.numRetries + + " failed with <" + e + ">. Retrying after sleep of " + this.pause); + } + + // We either cannot connect to master or it is not running. Sleep & retry + + try { + Thread.sleep(this.pause); + } catch (InterruptedException e) { + // continue + } + } + this.masterChecked = true; + } + if (this.master == null) { + throw new MasterNotRunningException(); + } + return this.master; + } + + /** {@inheritDoc} */ + public boolean isMasterRunning() { + if (this.master == null) { + try { + getMaster(); + + } catch (MasterNotRunningException e) { + return false; + } + } + return true; + } + + /** {@inheritDoc} */ + public boolean tableExists(final Text tableName) { + boolean exists = true; + try { + SortedMap servers = getTableServers(tableName); + if (servers == null || servers.size() == 0) { + exists = false; + } + + } catch (IOException e) { + exists = false; + } + return exists; + } + + /** {@inheritDoc} */ + public HTableDescriptor[] listTables() throws IOException { + TreeSet uniqueTables = new TreeSet(); + + SortedMap metaTables = + getTableServers(META_TABLE_NAME); + + for (HRegionLocation t: metaTables.values()) { + HRegionInterface server = getHRegionConnection(t.getServerAddress()); + long scannerId = -1L; + try { + scannerId = server.openScanner(t.getRegionInfo().getRegionName(), + COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(), + null); + + DataInputBuffer inbuf = new DataInputBuffer(); + while (true) { + KeyedData[] values = server.next(scannerId); + if (values.length == 0) { + break; + } + for (int i = 0; i < values.length; i++) { + if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) { + inbuf.reset(values[i].getData(), values[i].getData().length); + HRegionInfo info = new HRegionInfo(); + info.readFields(inbuf); + + // Only examine the rows where the startKey is zero length + if (info.startKey.getLength() == 0) { + uniqueTables.add(info.tableDesc); + } + } + } + } + } catch (RemoteException ex) { + throw RemoteExceptionHandler.decodeRemoteException(ex); + + } finally { + if (scannerId != -1L) { + server.close(scannerId); + } + } + } + return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); + } + + /** {@inheritDoc} */ + public SortedMap + getTableServers(Text tableName) throws IOException { + + if (tableName == null || tableName.getLength() == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap tableServers = + tablesToServers.get(tableName); + + if (tableServers == null ) { + if (LOG.isDebugEnabled()) { + LOG.debug("No servers for " + tableName + ". Doing a find..."); + } + // We don't know where the table is. + // Load the information from meta. + tableServers = findServersForTable(tableName); + } + SortedMap servers = + new TreeMap(); + + servers.putAll(tableServers); + return servers; + } + + /** {@inheritDoc} */ + public SortedMap + reloadTableServers(final Text tableName) throws IOException { + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap servers = + new TreeMap(); + + // Reload information for the whole table + + servers.putAll(findServersForTable(tableName)); + if (LOG.isDebugEnabled()) { + LOG.debug("Result of findTable: " + servers.toString()); + } + + return servers; + } + + /** {@inheritDoc} */ + public HRegionInterface getHRegionConnection( + HServerAddress regionServer) throws IOException { + + HRegionInterface server; + synchronized (this.servers) { + // See if we already have a connection + server = this.servers.get(regionServer.toString()); + + if (server == null) { // Get a connection + long versionId = 0; + try { + versionId = + serverInterfaceClass.getDeclaredField("versionID").getLong(server); + + } catch (IllegalAccessException e) { + // Should never happen unless visibility of versionID changes + throw new UnsupportedOperationException( + "Unable to open a connection to a " + + serverInterfaceClass.getName() + " server.", e); + + } catch (NoSuchFieldException e) { + // Should never happen unless versionID field name changes in HRegionInterface + throw new UnsupportedOperationException( + "Unable to open a connection to a " + + serverInterfaceClass.getName() + " server.", e); + } + + try { + server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, + versionId, regionServer.getInetSocketAddress(), this.conf); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + this.servers.put(regionServer.toString(), server); + } + } + return server; + } + + /** {@inheritDoc} */ + public void close(Text tableName) { + if (tableName == null || tableName.getLength() == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + + if (closedTables.contains(tableName)) { + throw new IllegalStateException("table closed: " + tableName); + } + + SortedMap tableServers = + tablesToServers.remove(tableName); + + if (tableServers == null) { + throw new IllegalArgumentException("table was not opened: " + tableName); + } + + closedTables.add(tableName); + + // Shut down connections to the HRegionServers + + synchronized (this.servers) { + for (HRegionLocation r: tableServers.values()) { + this.servers.remove(r.getServerAddress().toString()); + } + } + } + + /* + * Clears the cache of all known information about the specified table and + * locates a table by searching the META or ROOT region (as appropriate) or + * by querying the master for the location of the root region if that is the + * table requested. + * + * @param tableName - name of table to find servers for + * @return - map of first row to table info for all regions in the table + * @throws IOException + */ + private SortedMap findServersForTable(Text tableName) + throws IOException { + + // Wipe out everything we know about this table + + if (this.tablesToServers.remove(tableName) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Wiping out all we know of " + tableName); + } + } + + SortedMap servers = + new TreeMap(); + + if (tableName.equals(ROOT_TABLE_NAME)) { + synchronized (rootRegionLock) { + // This block guards against two threads trying to find the root + // region at the same time. One will go do the find while the + // second waits. The second thread will not do find. + + SortedMap tableServers = + this.tablesToServers.get(ROOT_TABLE_NAME); + + if (tableServers == null) { + tableServers = locateRootRegion(); + } + servers.putAll(tableServers); + } + + } else if (tableName.equals(META_TABLE_NAME)) { + synchronized (metaRegionLock) { + // This block guards against two threads trying to load the meta + // region at the same time. The first will load the meta region and + // the second will use the value that the first one found. + + if (tablesToServers.get(ROOT_TABLE_NAME) == null) { + findServersForTable(ROOT_TABLE_NAME); + } + + SortedMap tableServers = + this.tablesToServers.get(META_TABLE_NAME); + + if (tableServers == null) { + for (int tries = 0; tries < numRetries; tries++) { + try { + tableServers = loadMetaFromRoot(); + break; + + } catch (IOException e) { + if (tries < numRetries - 1) { + findServersForTable(ROOT_TABLE_NAME); + continue; + } + throw e; + } + } + } + servers.putAll(tableServers); + } + } else { + boolean waited = false; + synchronized (this.tablesBeingLocated) { + // This block ensures that only one thread will actually try to + // find a table. If a second thread comes along it will wait + // until the first thread finishes finding the table. + + while (this.tablesBeingLocated.contains(tableName)) { + waited = true; + try { + this.tablesBeingLocated.wait(threadWakeFrequency); + } catch (InterruptedException e) { + } + } + if (!waited) { + this.tablesBeingLocated.add(tableName); + + } else { + SortedMap tableServers = + this.tablesToServers.get(tableName); + + if (tableServers == null) { + throw new TableNotFoundException("table not found: " + tableName); + } + servers.putAll(tableServers); + } + } + if (!waited) { + try { + for (int tries = 0; tries < numRetries; tries++) { + boolean success = true; // assume this works + + SortedMap metaServers = + this.tablesToServers.get(META_TABLE_NAME); + if (metaServers == null) { + metaServers = findServersForTable(META_TABLE_NAME); + } + Text firstMetaRegion = metaServers.headMap(tableName).lastKey(); + metaServers = metaServers.tailMap(firstMetaRegion); + + for (HRegionLocation t: metaServers.values()) { + try { + servers.putAll(scanOneMetaRegion(t, tableName)); + + } catch (IOException e) { + if (tries < numRetries - 1) { + findServersForTable(META_TABLE_NAME); + success = false; + break; + } + throw e; + } + } + if (success) { + break; + } + } + } finally { + synchronized (this.tablesBeingLocated) { + // Wake up the threads waiting for us to find the table + this.tablesBeingLocated.remove(tableName); + this.tablesBeingLocated.notifyAll(); + } + } + } + } + this.tablesToServers.put(tableName, servers); + if (LOG.isDebugEnabled()) { + for (Map.Entry e: servers.entrySet()) { + LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + + " for table " + tableName); + } + } + return servers; + } + + /* + * Load the meta table from the root table. + * + * @return map of first row to TableInfo for all meta regions + * @throws IOException + */ + private TreeMap loadMetaFromRoot() + throws IOException { + + SortedMap rootRegion = + this.tablesToServers.get(ROOT_TABLE_NAME); + + return scanOneMetaRegion( + rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME); + } + + /* + * Repeatedly try to find the root region by asking the master for where it is + * @return TreeMap for root regin if found + * @throws NoServerForRegionException - if the root region can not be located + * after retrying + * @throws IOException + */ + private TreeMap locateRootRegion() + throws IOException { + + getMaster(); + + HServerAddress rootRegionLocation = null; + for (int tries = 0; tries < numRetries; tries++) { + int localTimeouts = 0; + while (rootRegionLocation == null && localTimeouts < numRetries) { + rootRegionLocation = master.findRootRegion(); + if (rootRegionLocation == null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Waiting for root region."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch (InterruptedException iex) { + // continue + } + localTimeouts++; + } + } + + if (rootRegionLocation == null) { + throw new NoServerForRegionException( + "Timed out trying to locate root region"); + } + + HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); + + try { + rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // Don't bother sleeping. We've run out of retries. + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } + throw e; + } + + // Sleep and retry finding root region. + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Root region location changed. Sleeping."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch (InterruptedException iex) { + // continue + } + } + rootRegionLocation = null; + } + + if (rootRegionLocation == null) { + throw new NoServerForRegionException( + "unable to locate root region server"); + } + + TreeMap rootServer = + new TreeMap(); + + rootServer.put(EMPTY_START_ROW, + new HRegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); + + return rootServer; + } + + /* + * Scans a single meta region + * @param t the meta region we're going to scan + * @param tableName the name of the table we're looking for + * @return returns a map of startingRow to TableInfo + * @throws TableNotFoundException - if table does not exist + * @throws IllegalStateException - if table is offline + * @throws NoServerForRegionException - if table can not be found after retrying + * @throws IOException + */ + private TreeMap scanOneMetaRegion( + final HRegionLocation t, final Text tableName) throws IOException { + + HRegionInterface server = getHRegionConnection(t.getServerAddress()); + TreeMap servers = + new TreeMap(); + + for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { + + long scannerId = -1L; + try { + scannerId = + server.openScanner(t.getRegionInfo().getRegionName(), + COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); + + DataInputBuffer inbuf = new DataInputBuffer(); + while (true) { + HRegionInfo regionInfo = null; + String serverAddress = null; + KeyedData[] values = server.next(scannerId); + if (values.length == 0) { + if (servers.size() == 0) { + // If we didn't find any servers then the table does not exist + throw new TableNotFoundException("table '" + tableName + + "' does not exist in " + t); + } + + // We found at least one server for the table and now we're done. + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " server(s) for " + + "location: " + t + " for tablename " + tableName); + } + break; + } + + byte[] bytes = null; + TreeMap results = new TreeMap(); + for (int i = 0; i < values.length; i++) { + results.put(values[i].getKey().getColumn(), values[i].getData()); + } + regionInfo = new HRegionInfo(); + bytes = results.get(COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); + regionInfo.readFields(inbuf); + + if (!regionInfo.tableDesc.getName().equals(tableName)) { + // We're done + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " servers for table " + + tableName); + } + break; + } + + if (regionInfo.offLine) { + throw new IllegalStateException("table offline: " + tableName); + } + + bytes = results.get(COL_SERVER); + if (bytes == null || bytes.length == 0) { + // We need to rescan because the table we want is unassigned. + if (LOG.isDebugEnabled()) { + LOG.debug("no server address for " + regionInfo.toString()); + } + servers.clear(); + break; + } + serverAddress = new String(bytes, UTF8_ENCODING); + servers.put(regionInfo.startKey, new HRegionLocation( + regionInfo, new HServerAddress(serverAddress))); + } + } catch (IOException e) { + if (tries == numRetries - 1) { // no retries left + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if (scannerId != -1L) { + try { + server.close(scannerId); + } catch (Exception ex) { + LOG.warn(ex); + } + } + } + + if (servers.size() == 0 && tries == numRetries - 1) { + throw new NoServerForRegionException("failed to find server for " + + tableName + " after " + numRetries + " retries"); + } + + if (servers.size() <= 0) { + // The table is not yet being served. Sleep and retry. + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Table " + tableName + + " not currently being served."); + } + try { + Thread.sleep(pause); + } catch (InterruptedException ie) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding table " + tableName); + } + } + } + return servers; + } + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 561621) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -41,7 +41,8 @@ /** Parameter name for master address */ static final String MASTER_ADDRESS = "hbase.master"; - + + /** default host address */ static final String DEFAULT_HOST = "0.0.0.0"; /** Default master address */ @@ -100,7 +101,8 @@ /** The ROOT and META column family */ static final Text COLUMN_FAMILY = new Text("info:"); - + + /** Array of meta column names */ static final Text [] COLUMN_FAMILY_ARRAY = new Text [] {COLUMN_FAMILY}; /** ROOT/META column family member - contains HRegionInfo */ @@ -105,6 +107,9 @@ /** ROOT/META column family member - contains HRegionInfo */ static final Text COL_REGIONINFO = new Text(COLUMN_FAMILY + "regioninfo"); + + /** Array of column - contains HRegionInfo */ + static final Text[] COL_REGIONINFO_ARRAY = new Text [] {COL_REGIONINFO}; /** ROOT/META column family member - contains HServerAddress.toString() */ static final Text COL_SERVER = new Text(COLUMN_FAMILY + "server"); @@ -114,6 +119,9 @@ // Other constants + /** used by scanners, etc when they want to start at the beginning of a region */ + static final Text EMPTY_START_ROW = new Text(); + /** When we encode strings, we always specify UTF8 encoding */ static final String UTF8_ENCODING = "UTF-8"; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 561621) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -1721,6 +1721,10 @@ if (rootRegionLocation == null || !rootScanned) { // We can't proceed until the root region is online and has been // scanned + if (LOG.isDebugEnabled()) { + LOG.debug("root region=" + rootRegionLocation.toString() + + ", rootScanned=" + rootScanned); + } return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; @@ -1735,6 +1739,11 @@ // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { + LOG.debug("rootScanned=" + rootScanned + ", numberOfMetaRegions=" + + numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + + onlineMetaRegions.size()); + } return false; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java (revision 0) @@ -0,0 +1,94 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Contains the HRegionInfo for the region and the HServerAddress for the + * HRegionServer serving the region + */ +@SuppressWarnings("unchecked") +public class HRegionLocation implements Comparable { + private HRegionInfo regionInfo; + private HServerAddress serverAddress; + + /** + * Constructor + * + * @param regionInfo the HRegionInfo for the region + * @param serverAddress the HServerAddress for the region server + */ + public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { + this.regionInfo = regionInfo; + this.serverAddress = serverAddress; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "address: " + this.serverAddress.toString() + ", regioninfo: " + + this.regionInfo; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + return this.compareTo(o) == 0; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + int result = this.regionInfo.hashCode(); + result ^= this.serverAddress.hashCode(); + return result; + } + + /** @return HRegionInfo */ + public HRegionInfo getRegionInfo(){ + return regionInfo; + } + + /** @return HServerAddress */ + public HServerAddress getServerAddress(){ + return serverAddress; + } + + // + // Comparable + // + + /** + * {@inheritDoc} + */ + public int compareTo(Object o) { + HRegionLocation other = (HRegionLocation) o; + int result = this.regionInfo.compareTo(other.regionInfo); + if(result == 0) { + result = this.serverAddress.compareTo(other.serverAddress); + } + return result; + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (revision 0) @@ -0,0 +1,850 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Used to communicate with a single HBase table + */ +public class HTable implements HConstants { + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + protected final HConnection connection; + protected final Text tableName; + protected final long pause; + protected final int numRetries; + protected Random rand; + protected volatile SortedMap tableServers; + protected BatchUpdate batch; + + // For row mutation operations + + protected volatile long currentLockId; + protected volatile Text currentRegion; + protected volatile HRegionInterface currentServer; + protected volatile long clientid; + + protected volatile boolean closed; + + /** + * Creates an object to access a HBase table + * + * @param conf configuration object + * @param tableName name of the table + * @throws IOException + */ + public HTable(Configuration conf, Text tableName) throws IOException { + closed = true; + this.connection = HConnectionManager.getConnection(conf); + this.tableName = tableName; + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); + this.rand = new Random(); + tableServers = connection.getTableServers(tableName); + this.batch = null; + this.currentLockId = -1L; + closed = false; + } + + /** + * Find region location hosting passed row using cached info + * @param row Row to find. + * @return Location of row. + */ + HRegionLocation getRegionLocation(Text row) { + if (this.tableServers == null) { + throw new IllegalStateException("Must open table first"); + } + + // Only one server will have the row we are looking for + Text serverKey = (this.tableServers.containsKey(row)) ? + row : this.tableServers.headMap(row).lastKey(); + return this.tableServers.get(serverKey); + } + + /** + * Verifies that no update is in progress + */ + public synchronized void checkUpdateInProgress() { + if (batch != null || currentLockId != -1L) { + throw new IllegalStateException("update in progress"); + } + } + + /** + * Gets the starting row key for every region in the currently open table + * @return Array of region starting row keys + */ + public Text[] getStartKeys() { + if (closed) { + throw new IllegalStateException("table is closed"); + } + Text[] keys = new Text[tableServers.size()]; + int i = 0; + for(Text key: tableServers.keySet()){ + keys[i++] = key; + } + return keys; + } + + /** + * Get a single value for the specified row and column + * + * @param row row key + * @param column column name + * @return value for specified row/column + * @throws IOException + */ + public byte[] get(Text row, Text column) throws IOException { + byte [] value = null; + for(int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + value = server.get(r.getRegionInfo().getRegionName(), row, column); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + return value; + } + + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ + public byte[][] get(Text row, Text column, int numVersions) throws IOException { + byte [][] values = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + values = server.get(r.getRegionInfo().getRegionName(), row, column, + numVersions); + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + + if (values != null) { + ArrayList bytes = new ArrayList(); + for (int i = 0 ; i < values.length; i++) { + bytes.add(values[i]); + } + return bytes.toArray(new byte[values.length][]); + } + return null; + } + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException + */ + public byte[][] get(Text row, Text column, long timestamp, int numVersions) + throws IOException { + byte [][] values = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + values = server.get(r.getRegionInfo().getRegionName(), row, column, + timestamp, numVersions); + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + + if (values != null) { + ArrayList bytes = new ArrayList(); + for (int i = 0 ; i < values.length; i++) { + bytes.add(values[i]); + } + return bytes.toArray(new byte[values.length][]); + } + return null; + } + + /** + * Get all the data for the specified row + * + * @param row - row key + * @return - map of colums to values + * @throws IOException + */ + public SortedMap getRow(Text row) throws IOException { + KeyedData[] value = null; + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + value = server.getRow(r.getRegionInfo().getRegionName(), row); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + + } catch (InterruptedException x) { + // continue + } + } + TreeMap results = new TreeMap(); + if (value != null && value.length != 0) { + for (int i = 0; i < value.length; i++) { + results.put(value[i].getKey().getColumn(), value[i].getData()); + } + } + return results; + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow) throws IOException { + + return obtainScanner(columns, startRow, System.currentTimeMillis(), null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, long timestamp) throws IOException { + + return obtainScanner(columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, RowFilterInterface filter) throws IOException { + + return obtainScanner(columns, startRow, System.currentTimeMillis(), filter); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public HScannerInterface obtainScanner(Text[] columns, + Text startRow, long timestamp, RowFilterInterface filter) + throws IOException { + + return new ClientScanner(columns, startRow, timestamp, filter); + } + + /** + * Start a batch of row insertions/updates. + * + * No changes are committed until the call to commitBatchUpdate returns. + * A call to abortBatchUpdate will abandon the entire batch. + * + * @param row name of row to be updated + * @return lockid to be used in subsequent put, delete and commit calls + */ + public synchronized long startBatchUpdate(final Text row) { + if (batch != null || currentLockId != -1L) { + throw new IllegalStateException("update in progress"); + } + batch = new BatchUpdate(); + return batch.startUpdate(row); + } + + /** + * Abort a batch mutation + * @param lockid lock id returned by startBatchUpdate + */ + public synchronized void abortBatch(final long lockid) { + if (batch == null) { + throw new IllegalStateException("no batch update in progress"); + } + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + batch = null; + } + + /** + * Finalize a batch mutation + * + * @param lockid lock id returned by startBatchUpdate + * @throws IOException + */ + public void commitBatch(final long lockid) throws IOException { + commitBatch(lockid, System.currentTimeMillis()); + } + + /** + * Finalize a batch mutation + * + * @param lockid lock id returned by startBatchUpdate + * @param timestamp time to associate with all the changes + * @throws IOException + */ + public synchronized void commitBatch(final long lockid, final long timestamp) + throws IOException { + + if (batch == null) { + throw new IllegalStateException("no batch update in progress"); + } + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + + try { + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(batch.getRow()); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch); + break; + + } catch (IOException e) { + if (tries < numRetries -1) { + tableServers = connection.reloadTableServers(tableName); + + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { + } + } + } finally { + batch = null; + } + } + + /** + * Start an atomic row insertion/update. No changes are committed until the + * call to commit() returns. A call to abort() will abandon any updates in progress. + * + * Callers to this method are given a lease for each unique lockid; before the + * lease expires, either abort() or commit() must be called. If it is not + * called, the system will automatically call abort() on the client's behalf. + * + * The client can gain extra time with a call to renewLease(). + * Start an atomic row insertion or update + * + * @param row Name of row to start update against. + * @return Row lockid. + * @throws IOException + */ + public synchronized long startUpdate(final Text row) throws IOException { + if (currentLockId != -1L || batch != null) { + throw new IllegalStateException("update in progress"); + } + for (int tries = 0; tries < numRetries; tries++) { + IOException e = null; + HRegionLocation info = getRegionLocation(row); + try { + currentServer = + connection.getHRegionConnection(info.getServerAddress()); + + currentRegion = info.getRegionInfo().getRegionName(); + clientid = rand.nextLong(); + currentLockId = currentServer.startUpdate(currentRegion, clientid, row); + + break; + + } catch (IOException ex) { + e = ex; + } + if (tries < numRetries - 1) { + try { + Thread.sleep(this.pause); + + } catch (InterruptedException ex) { + } + try { + tableServers = connection.reloadTableServers(tableName); + + } catch (IOException ex) { + e = ex; + } + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + return currentLockId; + } + + /** + * Change a value for the specified column. + * Runs {@link #abort(long)} if exception thrown. + * + * @param lockid lock id returned from startUpdate + * @param column column whose value is being set + * @param val new value for column + * @throws IOException + */ + public void put(long lockid, Text column, byte val[]) throws IOException { + if (val == null) { + throw new IllegalArgumentException("value cannot be null"); + } + if (batch != null) { + batch.put(lockid, column, val); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.put(this.currentRegion, this.clientid, lockid, column, + val); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Delete the value for a column + * + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + * @throws IOException + */ + public void delete(long lockid, Text column) throws IOException { + if (batch != null) { + batch.delete(lockid, column); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.delete(this.currentRegion, this.clientid, lockid, + column); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch(IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Abort a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public synchronized void abort(long lockid) throws IOException { + if (batch != null) { + abortBatch(lockid); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + + try { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e) { + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } finally { + currentLockId = -1L; + } + } + + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public void commit(long lockid) throws IOException { + commit(lockid, System.currentTimeMillis()); + } + + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @param timestamp - time to associate with the change + * @throws IOException + */ + public synchronized void commit(long lockid, long timestamp) throws IOException { + if (batch != null) { + commitBatch(lockid, timestamp); + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + + try { + try { + this.currentServer.commit(this.currentRegion, this.clientid, lockid, + timestamp); + + } catch (IOException e) { + this.currentServer = null; + this.currentRegion = null; + if(e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } finally { + currentLockId = -1L; + } + } + + /** + * Renew lease on update + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public synchronized void renewLease(long lockid) throws IOException { + if (batch != null) { + return; + } + + if (lockid != currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } + try { + this.currentServer.renewLease(lockid, this.clientid); + } catch (IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch (IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + + /** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all. + */ + protected class ClientScanner implements HScannerInterface { + private final Text EMPTY_COLUMN = new Text(); + private Text[] columns; + private Text startRow; + private long scanTime; + @SuppressWarnings("hiding") + private boolean closed; + private AtomicReferenceArray regions; + @SuppressWarnings("hiding") + private int currentRegion; + private HRegionInterface server; + private long scannerId; + private RowFilterInterface filter; + + private void loadRegions() { + Text firstServer = null; + if (this.startRow == null || this.startRow.getLength() == 0) { + firstServer = tableServers.firstKey(); + + } else if(tableServers.containsKey(startRow)) { + firstServer = startRow; + + } else { + firstServer = tableServers.headMap(startRow).lastKey(); + } + Collection info = + tableServers.tailMap(firstServer).values(); + + this.regions = new AtomicReferenceArray( + info.toArray(new HRegionLocation[info.size()])); + } + + protected ClientScanner(Text[] columns, Text startRow, long timestamp, + RowFilterInterface filter) throws IOException { + + this.columns = columns; + this.startRow = startRow; + this.scanTime = timestamp; + this.closed = false; + this.filter = filter; + if (filter != null) { + filter.validate(columns); + } + loadRegions(); + this.currentRegion = -1; + this.server = null; + this.scannerId = -1L; + nextScanner(); + } + + /* + * Gets a scanner for the next region. + * Returns false if there are no more scanners. + */ + private boolean nextScanner() throws IOException { + if (this.scannerId != -1L) { + this.server.close(this.scannerId); + this.scannerId = -1L; + } + this.currentRegion += 1; + if (this.currentRegion == this.regions.length()) { + close(); + return false; + } + try { + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = this.regions.get(currentRegion); + this.server = + connection.getHRegionConnection(r.getServerAddress()); + + try { + if (this.filter == null) { + this.scannerId = + this.server.openScanner(r.getRegionInfo().getRegionName(), + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, scanTime, null); + + } else { + this.scannerId = + this.server.openScanner(r.getRegionInfo().getRegionName(), + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, scanTime, filter); + } + + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + // No more tries + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + tableServers = connection.reloadTableServers(tableName); + loadRegions(); + } + } + + } catch (IOException e) { + close(); + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + return true; + } + + /** + * {@inheritDoc} + */ + public boolean next(HStoreKey key, TreeMap results) throws IOException { + if (this.closed) { + return false; + } + KeyedData[] values = null; + do { + values = this.server.next(this.scannerId); + } while (values != null && values.length == 0 && nextScanner()); + + if (values != null && values.length != 0) { + for (int i = 0; i < values.length; i++) { + key.setRow(values[i].getKey().getRow()); + key.setVersion(values[i].getKey().getTimestamp()); + key.setColumn(EMPTY_COLUMN); + results.put(values[i].getKey().getColumn(), values[i].getData()); + } + } + return values == null ? false : values.length != 0; + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException { + if (this.scannerId != -1L) { + this.server.close(this.scannerId); + this.scannerId = -1L; + } + this.server = null; + this.closed = true; + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (revision 561621) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (working copy) @@ -53,6 +53,7 @@ this.regionServers = 1; } + /** {@inheritDoc} */ @Override public void setUp() throws Exception { super.setUp(); @@ -60,6 +61,7 @@ new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs); } + /** {@inheritDoc} */ @Override public void tearDown() throws Exception { super.tearDown(); @@ -66,5 +68,6 @@ if (this.cluster != null) { this.cluster.shutdown(); } + HConnectionManager.deleteConnection(conf); } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (revision 561621) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (working copy) @@ -69,7 +69,7 @@ // Setup colkeys to be inserted HTableDescriptor htd = new HTableDescriptor(getName()); Text tableName = new Text(getName()); - Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1]; + Text[] colKeys = new Text[(LAST_COLKEY - FIRST_COLKEY) + 1]; for (char i = 0; i < colKeys.length; i++) { colKeys[i] = new Text(new String(new char[] { (char)(FIRST_COLKEY + i), ':' })); @@ -201,9 +201,9 @@ long scannerId = -1L; try { client.openTable(table); - HClient.RegionLocation rl = client.getRegionLocation(table); - regionServer = client.getHRegionConnection(rl.serverAddress); - scannerId = regionServer.openScanner(rl.regionInfo.regionName, + HRegionLocation rl = client.getRegionLocation(table); + regionServer = client.getHRegionConnection(rl.getServerAddress()); + scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(), HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap();