snapshot = null;
- HLocking locking = new HLocking();
+ ReadWriteLock locker = new ReentrantReadWriteLock();
public HMemcache() {
}
@@ -52,18 +54,22 @@
}
/**
- * We want to return a snapshot of the current HMemcache with a known HLog
+ * Returns a snapshot of the current HMemcache with a known HLog
* sequence number at the same time.
+ *
+ * We need to prevent any writing to the cache during this time,
+ * so we obtain a write lock for the duration of the operation.
*
- * Return both the frozen HMemcache TreeMap, as well as the HLog seq number.
- *
- * We need to prevent any writing to the cache during this time, so we obtain
- * a write lock for the duration of the operation.
+ * If this method returns non-null, client must call
+ * {@link #deleteSnapshot()} to clear 'snapshot-in-progress'
+ * state when finished with the returned {@link Snapshot}.
+ *
+ * @return frozen HMemcache TreeMap and HLog sequence number.
*/
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
if(snapshot != null) {
throw new IOException("Snapshot in progress!");
@@ -86,7 +92,7 @@
return retval;
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -96,7 +102,7 @@
* Modifying the structure means we need to obtain a writelock.
*/
public void deleteSnapshot() throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
if(snapshot == null) {
@@ -118,7 +124,7 @@
LOG.debug("snapshot deleted");
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -128,7 +134,7 @@
* Operation uses a write lock.
*/
public void add(Text row, TreeMap columns, long timestamp) {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
for(Iterator it = columns.keySet().iterator(); it.hasNext(); ) {
Text column = it.next();
@@ -139,7 +145,7 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -150,7 +156,7 @@
*/
public byte[][] get(HStoreKey key, int numVersions) {
Vector results = new Vector();
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
Vector result = get(memcache, key, numVersions-results.size());
results.addAll(0, result);
@@ -172,7 +178,7 @@
}
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
}
}
@@ -184,7 +190,7 @@
*/
public TreeMap getFull(HStoreKey key) throws IOException {
TreeMap results = new TreeMap();
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
internalGetFull(memcache, key, results);
for(int i = history.size()-1; i >= 0; i--) {
@@ -194,7 +200,7 @@
return results;
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
}
}
@@ -271,7 +277,7 @@
super(timestamp, targetCols);
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
int i = 0;
@@ -359,7 +365,7 @@
}
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
scannerClosed = true;
}
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (.../vendor/current) (revision 0)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (.../dev) (revision 7443)
@@ -0,0 +1,25 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HBaseConfiguration extends Configuration {
+ public HBaseConfiguration() {
+ super();
+ addDefaultResource("hbase-default.xml");
+ }
+}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (.../dev) (revision 7443)
@@ -15,7 +15,9 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.*;
@@ -23,17 +25,13 @@
* Clients interact with HRegionServers using
* a handle to the HRegionInterface.
******************************************************************************/
-public interface HRegionInterface {
+public interface HRegionInterface extends VersionedProtocol {
public static final long versionID = 1L; // initial version
// Get metainfo about an HRegion
public HRegionInfo getRegionInfo(Text regionName);
- // Start a scanner for a given HRegion.
-
- public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
-
// GET methods for an HRegion.
public BytesWritable get(Text regionName, Text row, Text column) throws IOException;
@@ -58,4 +56,41 @@
public void abort(Text regionName, long clientid, long lockid) throws IOException;
public void commit(Text regionName, long clientid, long lockid) throws IOException;
public void renewLease(long lockid, long clientid) throws IOException;
+
+ //////////////////////////////////////////////////////////////////////////////
+ // remote scanner interface
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Opens a remote scanner.
+ *
+ * @param clientId - client identifier (so we can associate a scanner with a client)
+ * @param regionName - name of region to scan
+ * @param columns - columns to scan
+ * @param startRow - starting row to scan
+ *
+ * @param scannerId - scanner identifier used in other calls
+ * @throws IOException
+ */
+ public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
+
+ /**
+ * Get the next set of values
+ *
+ * @param scannerId - clientId passed to openScanner
+ * @param key - the next HStoreKey
+ * @param columns - an array of column names
+ * @param values - an array of byte[] values (corresponds 1-1 with columns)
+ * @return - true if a value was retrieved
+ * @throws IOException
+ */
+ public LabelledData[] next(long scannerId, HStoreKey key) throws IOException;
+
+ /**
+ * Close a scanner
+ *
+ * @param scannerId - the scanner id returned by openScanner
+ * @throws IOException
+ */
+ public void close(long scannerId) throws IOException;
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (.../dev) (revision 7443)
@@ -93,6 +93,11 @@
return timestamp;
}
+ /**
+ * @param other Key to compare against. Compares row and column.
+ * @return True if same row and column.
+ * @see {@link #matchesWithoutColumn(HStoreKey)}
+ */
public boolean matchesRowCol(HStoreKey other) {
if(this.row.compareTo(other.row) == 0 &&
this.column.compareTo(other.column) == 0) {
@@ -103,6 +108,13 @@
}
}
+ /**
+ * @param other Key to copmare against. Compares row and
+ * timestamp.
+ * @return True if same row and timestamp is greater than
+ * other
+ * @see {@link #matchesRowCol(HStoreKey)}
+ */
public boolean matchesWithoutColumn(HStoreKey other) {
if((this.row.compareTo(other.row) == 0) &&
(this.timestamp >= other.getTimestamp())) {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (.../dev) (revision 7443)
@@ -15,6 +15,8 @@
*/
package org.apache.hadoop.hbase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
@@ -22,19 +24,34 @@
import java.io.*;
import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/*******************************************************************************
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
******************************************************************************/
-public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+public class HRegionServer
+ implements HConstants, HRegionInterface, Runnable {
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(HRegionInterface.class.getName())) {
+ return HRegionInterface.versionID;
+ } else {
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(HRegionServer.class);
+
private boolean stopRequested;
private Path regionDir;
private HServerAddress address;
private Configuration conf;
private Random rand;
private TreeMap regions; // region name -> HRegion
- private HLocking locking;
+ private ReadWriteLock locker;
private Vector outboundMsgs;
private long threadWakeFrequency;
@@ -67,12 +84,12 @@
// Grab a list of regions to check
Vector checkSplit = new Vector();
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
checkSplit.addAll(regions.values());
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
}
// Check to see if they need splitting
@@ -95,14 +112,16 @@
for(Iterator it = toSplit.iterator(); it.hasNext(); ) {
SplitRegion r = it.next();
- locking.obtainWriteLock();
+ locker.writeLock().lock();
regions.remove(r.region.getRegionName());
- locking.releaseWriteLock();
+ locker.writeLock().unlock();
HRegion[] newRegions = null;
try {
Text oldRegion = r.region.getRegionName();
+ LOG.info("splitting region: " + oldRegion);
+
newRegions = r.region.closeAndSplit(r.midKey);
// When a region is split, the META table needs to updated if we're
@@ -113,6 +132,8 @@
= (oldRegion.find(META_TABLE_NAME.toString()) == 0)
? ROOT_TABLE_NAME : META_TABLE_NAME;
+ LOG.debug("region split complete. updating meta");
+
client.openTable(tableToUpdate);
long lockid = client.startUpdate(oldRegion);
client.delete(lockid, META_COL_REGIONINFO);
@@ -132,7 +153,14 @@
// Now tell the master about the new regions
+ LOG.debug("reporting region split to master");
+
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+
+ LOG.info("region split successful. old region=" + oldRegion
+ + ", new regions: " + newRegions[0].getRegionName() + ", "
+ + newRegions[1].getRegionName());
+
newRegions[0].close();
newRegions[1].close();
@@ -145,11 +173,15 @@
// Sleep
- long endTime = System.currentTimeMillis();
- try {
- Thread.sleep(splitCheckFrequency - (endTime - startTime));
-
- } catch(InterruptedException iex) {
+ long waitTime =
+ splitCheckFrequency - (System.currentTimeMillis() - startTime);
+
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
}
}
@@ -167,12 +199,12 @@
// Grab a list of items to flush
Vector toFlush = new Vector();
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
toFlush.addAll(regions.values());
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
}
// Flush them, if necessary
@@ -190,11 +222,15 @@
// Sleep
- long endTime = System.currentTimeMillis();
- try {
- Thread.sleep(threadWakeFrequency - (endTime - startTime));
-
- } catch(InterruptedException iex) {
+ long waitTime =
+ threadWakeFrequency - (System.currentTimeMillis() - startTime);
+
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
}
}
@@ -249,7 +285,7 @@
/** Start a HRegionServer at the default location */
public HRegionServer(Configuration conf) throws IOException {
this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
- new HServerAddress(conf.get("hbase.regionserver.default.name")),
+ new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")),
conf);
}
@@ -261,12 +297,12 @@
this.stopRequested = false;
this.regionDir = regionDir;
- this.address = address;
this.conf = conf;
this.rand = new Random();
this.regions = new TreeMap();
- this.locking = new HLocking();
+ this.locker = new ReentrantReadWriteLock();
this.outboundMsgs = new Vector();
+ this.scanners = Collections.synchronizedMap(new TreeMap());
// Config'ed params
@@ -278,37 +314,53 @@
// Cache flushing
this.cacheFlusher = new Flusher();
- this.cacheFlusherThread = new Thread(cacheFlusher);
+ this.cacheFlusherThread = new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
// Check regions to see if they need to be split
this.splitChecker = new SplitChecker();
- this.splitCheckerThread = new Thread(splitChecker);
+ this.splitCheckerThread = new Thread(splitChecker, "HRegionServer.splitChecker");
+
+ // Process requests from Master
+
+ this.toDo = new Vector();
+ this.worker = new Worker();
+ this.workerThread = new Thread(worker, "HRegionServer.worker");
try {
+
+ // Server to handle client requests
+
+ this.server = RPC.getServer(this, address.getBindAddress().toString(),
+ address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+
+ this.address = new HServerAddress(server.getListenerAddress());
+
// Local file paths
- this.fs = FileSystem.get(conf);
- Path newlogdir = new Path(regionDir, "log" + "_" + address.toString());
- this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString());
+ String serverName = this.address.getBindAddress() + "_" + this.address.getPort();
+ Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
+ this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
// Logging
+ this.fs = FileSystem.get(conf);
HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
this.log = new HLog(fs, newlogdir, conf);
this.logRoller = new LogRoller();
- this.logRollerThread = new Thread(logRoller);
+ this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
// Remote HMaster
this.hbaseMaster = (HMasterRegionInterface)
RPC.waitForProxy(HMasterRegionInterface.class,
- HMasterRegionInterface.versionId,
- new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(),
+ HMasterRegionInterface.versionID,
+ new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
conf);
// Threads
+ this.workerThread.start();
this.cacheFlusherThread.start();
this.splitCheckerThread.start();
this.logRollerThread.start();
@@ -317,14 +369,14 @@
// Server
- this.server = RPC.getServer(this, address.getBindAddress().toString(),
- address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
this.server.start();
} catch(IOException e) {
this.stopRequested = true;
throw e;
}
+
+ LOG.info("HRegionServer started at: " + address.toString());
}
/**
@@ -342,12 +394,18 @@
fs.close();
server.stop();
}
-
+ LOG.info("stopping server at: " + address.toString());
}
/** Call join to wait for all the threads to finish */
public void join() {
try {
+ this.workerThread.join();
+
+ } catch(InterruptedException iex) {
+ }
+
+ try {
this.logRollerThread.join();
} catch(InterruptedException iex) {
@@ -366,7 +424,7 @@
} catch(InterruptedException iex) {
}
-
+ LOG.info("server stopped at: " + address.toString());
}
/**
@@ -388,10 +446,12 @@
} catch(IOException e) {
waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
- try {
- Thread.sleep(waitTime);
-
- } catch(InterruptedException iex) {
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
continue;
}
@@ -411,10 +471,33 @@
HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray);
lastMsg = System.currentTimeMillis();
- // Process the HMaster's instruction stream
+ // Queue up the HMaster's instruction stream for processing
- if(! processMessages(msgs)) {
- break;
+ synchronized(toDo) {
+ boolean restartOrStop = false;
+ for(int i = 0; i < msgs.length; i++) {
+ switch(msgs[i].getMsg()) {
+
+ case HMsg.MSG_CALL_SERVER_STARTUP:
+ closeAllRegions();
+ restartOrStop = true;
+ break;
+
+ case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:
+ stop();
+ restartOrStop = true;
+ break;
+
+ default:
+ toDo.add(msgs[i]);
+ }
+ }
+ if(toDo.size() > 0) {
+ toDo.notifyAll();
+ }
+ if(restartOrStop) {
+ break;
+ }
}
} catch(IOException e) {
@@ -424,55 +507,16 @@
waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
- try {
- Thread.sleep(waitTime);
- } catch(InterruptedException iex) {
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+ } catch(InterruptedException iex) {
+ }
}
-
}
}
}
- private boolean processMessages(HMsg[] msgs) throws IOException {
- for(int i = 0; i < msgs.length; i++) {
- switch(msgs[i].getMsg()) {
-
- case HMsg.MSG_REGION_OPEN: // Open a region
- openRegion(msgs[i].getRegionInfo());
- break;
-
- case HMsg.MSG_REGION_CLOSE: // Close a region
- closeRegion(msgs[i].getRegionInfo(), true);
- break;
-
- case HMsg.MSG_REGION_MERGE: // Merge two regions
- //TODO ???
- throw new IOException("TODO: need to figure out merge");
- //break;
-
- case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
- closeAllRegions();
- return false;
-
- case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away
- stop();
- return false;
-
- case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
- closeRegion(msgs[i].getRegionInfo(), false);
- break;
-
- case HMsg.MSG_REGION_CLOSE_AND_DELETE:
- closeAndDeleteRegion(msgs[i].getRegionInfo());
- break;
-
- default:
- throw new IOException("Impossible state during msg processing. Instruction: " + msgs[i]);
- }
- }
- return true;
- }
-
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
@@ -508,9 +552,68 @@
// HMaster-given operations
//////////////////////////////////////////////////////////////////////////////
+ private Vector toDo;
+ private Worker worker;
+ private Thread workerThread;
+ private class Worker implements Runnable {
+ public void run() {
+ while(!stopRequested) {
+ HMsg msg = null;
+ synchronized(toDo) {
+ while(toDo.size() == 0) {
+ try {
+ toDo.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+ msg = toDo.remove(0);
+ }
+ try {
+ switch(msg.getMsg()) {
+
+ case HMsg.MSG_REGION_OPEN: // Open a region
+ openRegion(msg.getRegionInfo());
+ break;
+
+ case HMsg.MSG_REGION_CLOSE: // Close a region
+ closeRegion(msg.getRegionInfo(), true);
+ break;
+
+ case HMsg.MSG_REGION_MERGE: // Merge two regions
+ //TODO ???
+ throw new IOException("TODO: need to figure out merge");
+ //break;
+
+ case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
+ closeAllRegions();
+ continue;
+
+ case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away
+ stop();
+ continue;
+
+ case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
+ closeRegion(msg.getRegionInfo(), false);
+ break;
+
+ case HMsg.MSG_REGION_CLOSE_AND_DELETE:
+ closeAndDeleteRegion(msg.getRegionInfo());
+ break;
+
+ default:
+ throw new IOException("Impossible state during msg processing. Instruction: " + msg);
+ }
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
private void openRegion(HRegionInfo regionInfo) throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
@@ -518,14 +621,14 @@
reportOpen(region);
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = regions.remove(info.regionName);
@@ -538,13 +641,13 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = regions.remove(info.regionName);
@@ -553,13 +656,13 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
/** Called either when the master tells us to restart or from stop() */
private void closeAllRegions() throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
for(Iterator it = regions.values().iterator(); it.hasNext(); ) {
HRegion region = it.next();
@@ -568,7 +671,7 @@
regions.clear();
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -612,17 +715,6 @@
return region.getRegionInfo();
}
- /** Start a scanner for a given HRegion. */
- public HScannerInterface openScanner(Text regionName, Text[] cols,
- Text firstRow) throws IOException {
-
- HRegion r = getRegion(regionName);
- if(r == null) {
- throw new IOException("Not serving region " + regionName);
- }
- return r.getScanner(cols, firstRow);
- }
-
/** Get the indicated row/column */
public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
HRegion region = getRegion(regionName);
@@ -806,13 +898,126 @@
/** Private utility method for safely obtaining an HRegion handle. */
private HRegion getRegion(Text regionName) {
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
return regions.get(regionName);
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
}
}
+ //////////////////////////////////////////////////////////////////////////////
+ // remote scanner interface
+ //////////////////////////////////////////////////////////////////////////////
+
+ private Map scanners;
+ private class ScannerListener extends LeaseListener {
+ private Text scannerName;
+
+ public ScannerListener(Text scannerName) {
+ this.scannerName = scannerName;
+ }
+
+ public void leaseExpired() {
+ HScannerInterface s = scanners.remove(scannerName);
+ if(s != null) {
+ try {
+ s.close();
+
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /** Start a scanner for a given HRegion. */
+ public long openScanner(Text regionName, Text[] cols, Text firstRow)
+ throws IOException {
+
+ HRegion r = getRegion(regionName);
+ if(r == null) {
+ throw new IOException("Not serving region " + regionName);
+ }
+
+ long scannerId = -1L;
+ try {
+ HScannerInterface s = r.getScanner(cols, firstRow);
+ scannerId = rand.nextLong();
+ Text scannerName = new Text(String.valueOf(scannerId));
+ scanners.put(scannerName, s);
+ leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
+
+ } catch(IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ return scannerId;
+ }
+
+ public LabelledData[] next(long scannerId, HStoreKey key) throws IOException {
+
+ Text scannerName = new Text(String.valueOf(scannerId));
+ HScannerInterface s = scanners.get(scannerName);
+ if(s == null) {
+ throw new IOException("unknown scanner");
+ }
+ leases.renewLease(scannerName, scannerName);
+ TreeMap results = new TreeMap();
+ ArrayList values = new ArrayList();
+ if(s.next(key, results)) {
+ for(Iterator> it = results.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry e = it.next();
+ values.add(new LabelledData(e.getKey(), e.getValue()));
+ }
+ }
+ return values.toArray(new LabelledData[values.size()]);
+ }
+
+ public void close(long scannerId) throws IOException {
+ Text scannerName = new Text(String.valueOf(scannerId));
+ HScannerInterface s = scanners.remove(scannerName);
+ if(s == null) {
+ throw new IOException("unknown scanner");
+ }
+ try {
+ s.close();
+
+ } catch(IOException ex) {
+ ex.printStackTrace();
+ }
+ leases.cancelLease(scannerName, scannerName);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Main program
+ //////////////////////////////////////////////////////////////////////////////
+
+ private static void printUsage() {
+ System.err.println("Usage: java " +
+ "org.apache.hbase.HRegionServer [--bind=hostname:port]");
+ }
+
+ public static void main(String [] args) throws IOException {
+ Configuration conf = new HBaseConfiguration();
+
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ for (String cmd: args) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage();
+ return;
+ }
+
+ final String addressArgKey = "--bind=";
+ if (cmd.startsWith(addressArgKey)) {
+ conf.set(REGIONSERVER_ADDRESS,
+ cmd.substring(addressArgKey.length()));
+ }
+ }
+
+ new HRegionServer(conf);
+ }
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (.../dev) (revision 7443)
@@ -15,7 +15,8 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.IOException;
@@ -23,7 +24,7 @@
* Clients interact with the HMasterInterface to gain access to meta-level HBase
* functionality, like finding an HRegionServer and creating/destroying tables.
******************************************************************************/
-public interface HMasterInterface {
+public interface HMasterInterface extends VersionedProtocol {
public static final long versionID = 1L; // initial version
//////////////////////////////////////////////////////////////////////////////
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (.../dev) (revision 7443)
@@ -24,7 +24,18 @@
// Configuration parameters
- static final String MASTER_DEFAULT_NAME = "hbase.master.default.name";
+ // TODO: URL for hbase master, like hdfs URLs with host and port.
+ // Or, like jdbc URLs:
+ // jdbc:mysql://[host][,failoverhost...][:port]/[database]
+ // jdbc:mysql://[host][,failoverhost...][:port]/[database][?propertyName1][=propertyValue1][&propertyName2][=propertyValue2]...
+
+ static final String MASTER_ADDRESS = "hbase.master";
+ // TODO: Support 'local': i.e. default of all running in single
+ // process. Same for regionserver.
+ static final String DEFAULT_MASTER_ADDRESS = "localhost:60000";
+ static final String REGIONSERVER_ADDRESS = "hbase.regionserver";
+ static final String DEFAULT_REGIONSERVER_ADDRESS =
+ "localhost:60010";
static final String HREGION_DIR = "hbase.regiondir";
static final String DEFAULT_HREGION_DIR = "/hbase";
static final String HREGIONDIR_PREFIX = "hregion_";
@@ -37,10 +48,10 @@
// Do we ever need to know all the information that we are storing?
static final Text ROOT_TABLE_NAME = new Text("--ROOT--");
- static final Text ROOT_COLUMN_FAMILY = new Text("info");
- static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + ":" + "regioninfo");
- static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + ":" + "server");
- static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + ":" + "serverstartcode");
+ static final Text ROOT_COLUMN_FAMILY = new Text("info:");
+ static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + "regioninfo");
+ static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + "server");
+ static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + "serverstartcode");
static final Text META_TABLE_NAME = new Text("--META--");
static final Text META_COLUMN_FAMILY = new Text(ROOT_COLUMN_FAMILY);
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (.../dev) (revision 7443)
@@ -33,6 +33,12 @@
this.stringValue = null;
}
+ public HServerAddress(InetSocketAddress address) {
+ this.address = address;
+ this.stringValue = new String(address.getAddress().getHostAddress()
+ + ":" + address.getPort());
+ }
+
public HServerAddress(String hostAndPort) {
int colonIndex = hostAndPort.indexOf(':');
if(colonIndex < 0) {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (.../dev) (revision 7443)
@@ -15,29 +15,39 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.conf.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
-import java.io.*;
-import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.log4j.Logger;
/*******************************************************************************
* HClient manages a connection to a single HRegionServer.
******************************************************************************/
-public class HClient extends HGlobals implements HConstants {
+public class HClient implements HConstants {
+ private final Logger LOG =
+ Logger.getLogger(this.getClass().getName());
+
private static final Text[] metaColumns = {
META_COLUMN_FAMILY
};
private static final Text startRow = new Text();
private boolean closed;
- private Configuration conf;
- private HServerAddress masterLocation;
private long clientTimeout;
private int numTimeouts;
private int numRetries;
private HMasterInterface master;
+ private final Configuration conf;
private class TableInfo {
public HRegionInfo regionInfo;
@@ -72,16 +82,11 @@
public HClient(Configuration conf) {
this.closed = false;
this.conf = conf;
+
+ this.clientTimeout = conf.getLong("hbase.client.timeout.length", 10 * 1000);
+ this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5);
+ this.numRetries = conf.getInt("hbase.client.retries.number", 2);
- // Load config settings
-
- this.masterLocation = new HServerAddress(this.conf.get(MASTER_DEFAULT_NAME));
- this.clientTimeout = this.conf.getLong("hbase.client.timeout.length", 10 * 1000);
- this.numTimeouts = this.conf.getInt("hbase.client.timeout.number", 5);
- this.numRetries = this.conf.getInt("hbase.client.retries.number", 2);
-
- // Finish initialization
-
this.master = null;
this.tablesToServers = new TreeMap>();
this.tableServers = null;
@@ -94,6 +99,26 @@
this.rand = new Random();
}
+ public synchronized void createTable(HTableDescriptor desc) throws IOException {
+ if(closed) {
+ throw new IllegalStateException("client is not open");
+ }
+ if(master == null) {
+ locateRootRegion();
+ }
+ master.createTable(desc);
+ }
+
+ public synchronized void deleteTable(Text tableName) throws IOException {
+ if(closed) {
+ throw new IllegalStateException("client is not open");
+ }
+ if(master == null) {
+ locateRootRegion();
+ }
+ master.deleteTable(tableName);
+ }
+
public synchronized void openTable(Text tableName) throws IOException {
if(closed) {
throw new IllegalStateException("client is not open");
@@ -145,9 +170,11 @@
*/
private void locateRootRegion() throws IOException {
if(master == null) {
+ HServerAddress masterLocation =
+ new HServerAddress(this.conf.get(MASTER_ADDRESS));
master = (HMasterInterface)RPC.getProxy(HMasterInterface.class,
- HMasterInterface.versionID,
- masterLocation.getInetSocketAddress(), conf);
+ HMasterInterface.versionID,
+ masterLocation.getInetSocketAddress(), conf);
}
int tries = 0;
@@ -160,7 +187,6 @@
if(rootRegionLocation == null) {
try {
Thread.sleep(clientTimeout);
-
} catch(InterruptedException iex) {
}
localTimeouts++;
@@ -174,9 +200,9 @@
HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
- if(rootRegion.getRegionInfo(rootRegionInfo.regionName) != null) {
+ if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) {
tableServers = new TreeMap();
- tableServers.put(startRow, new TableInfo(rootRegionInfo, rootRegionLocation));
+ tableServers.put(startRow, new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
tablesToServers.put(ROOT_TABLE_NAME, tableServers);
break;
}
@@ -202,43 +228,54 @@
/*
* Scans a single meta region
- * @param t - the table we're going to scan
- * @param tableName - the name of the table we're looking for
+ * @param t the table we're going to scan
+ * @param tableName the name of the table we're looking for
*/
private void scanOneMetaRegion(TableInfo t, Text tableName) throws IOException {
HRegionInterface server = getHRegionConnection(t.serverAddress);
- HScannerInterface scanner = null;
+ long scannerId = -1L;
try {
- scanner = server.openScanner(t.regionInfo.regionName, metaColumns, tableName);
- HStoreKey key = new HStoreKey();
- TreeMap results = new TreeMap();
+ scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, tableName);
+
DataInputBuffer inbuf = new DataInputBuffer();
+ while(true) {
+ HStoreKey key = new HStoreKey();
- while(scanner.next(key, results)) {
- byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
- inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ LabelledData[] values = server.next(scannerId, key);
+ if(values.length == 0) {
+ break;
+ }
+
+ TreeMap results = new TreeMap();
+ for(int i = 0; i < values.length; i++) {
+ results.put(values[i].getLabel(), values[i].getData().get());
+ }
HRegionInfo regionInfo = new HRegionInfo();
+ byte[] bytes = results.get(META_COL_REGIONINFO);
+ inbuf.reset(bytes, bytes.length);
regionInfo.readFields(inbuf);
-
- if(! regionInfo.tableDesc.getName().equals(tableName)) {
+
+ if(!regionInfo.tableDesc.getName().equals(tableName)) {
// We're done
break;
}
-
- byte serverBytes[] = results.get(META_COL_SERVER);
- String serverName = new String(serverBytes, UTF8_ENCODING);
+
+ bytes = results.get(META_COL_SERVER);
+ String serverName = new String(bytes, UTF8_ENCODING);
tableServers.put(regionInfo.startKey,
new TableInfo(regionInfo, new HServerAddress(serverName)));
- results.clear();
}
+
} finally {
- scanner.close();
+ if(scannerId != -1L) {
+ server.close(scannerId);
+ }
}
}
- public synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer)
+ synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer)
throws IOException {
// See if we already have a connection
@@ -270,7 +307,7 @@
* catalog table that just contains table names and their descriptors.
* Right now, it only exists as part of the META table's region info.
*/
- public HTableDescriptor[] listTables() throws IOException {
+ public synchronized HTableDescriptor[] listTables() throws IOException {
TreeSet uniqueTables = new TreeSet();
TreeMap metaTables = tablesToServers.get(META_TABLE_NAME);
@@ -280,37 +317,47 @@
metaTables = tablesToServers.get(META_TABLE_NAME);
}
- for(Iteratori = metaTables.values().iterator(); i.hasNext(); ) {
- TableInfo t = i.next();
+ for(Iteratorit = metaTables.values().iterator(); it.hasNext(); ) {
+ TableInfo t = it.next();
HRegionInterface server = getHRegionConnection(t.serverAddress);
- HScannerInterface scanner = null;
+ long scannerId = -1L;
try {
- scanner = server.openScanner(t.regionInfo.regionName, metaColumns, startRow);
+ scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, startRow);
HStoreKey key = new HStoreKey();
- TreeMap results = new TreeMap();
+
DataInputBuffer inbuf = new DataInputBuffer();
- while(scanner.next(key, results)) {
- byte infoBytes[] = (byte[]) results.get(ROOT_COL_REGIONINFO);
- inbuf.reset(infoBytes, infoBytes.length);
- HRegionInfo info = new HRegionInfo();
- info.readFields(inbuf);
+ while(true) {
+ LabelledData[] values = server.next(scannerId, key);
+ if(values.length == 0) {
+ break;
+ }
- // Only examine the rows where the startKey is zero length
-
- if(info.startKey.getLength() == 0) {
- uniqueTables.add(info.tableDesc);
+ for(int i = 0; i < values.length; i++) {
+ if(values[i].getLabel().equals(META_COL_REGIONINFO)) {
+ byte[] bytes = values[i].getData().get();
+ inbuf.reset(bytes, bytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+
+ // Only examine the rows where the startKey is zero length
+
+ if(info.startKey.getLength() == 0) {
+ uniqueTables.add(info.tableDesc);
+ }
+ }
}
- results.clear();
}
} finally {
- scanner.close();
+ if(scannerId != -1L) {
+ server.close(scannerId);
+ }
}
}
return (HTableDescriptor[]) uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
}
- private TableInfo getTableInfo(Text row) {
+ private synchronized TableInfo getTableInfo(Text row) {
if(tableServers == null) {
throw new IllegalStateException("Must open table first");
}
@@ -368,7 +415,7 @@
* Get a scanner on the current table starting at the specified row.
* Return the specified columns.
*/
- public HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException {
+ public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException {
if(tableServers == null) {
throw new IllegalStateException("Must open table first");
}
@@ -462,7 +509,7 @@
private TableInfo[] regions;
private int currentRegion;
private HRegionInterface server;
- private HScannerInterface scanner;
+ private long scannerId;
public ClientScanner(Text[] columns, Text startRow) throws IOException {
this.columns = columns;
@@ -472,7 +519,7 @@
this.regions = info.toArray(new TableInfo[info.size()]);
this.currentRegion = -1;
this.server = null;
- this.scanner = null;
+ this.scannerId = -1L;
nextScanner();
}
@@ -481,8 +528,9 @@
* Returns false if there are no more scanners.
*/
private boolean nextScanner() throws IOException {
- if(scanner != null) {
- scanner.close();
+ if(scannerId != -1L) {
+ server.close(scannerId);
+ scannerId = -1L;
}
currentRegion += 1;
if(currentRegion == regions.length) {
@@ -491,7 +539,7 @@
}
try {
server = getHRegionConnection(regions[currentRegion].serverAddress);
- scanner = server.openScanner(regions[currentRegion].regionInfo.regionName,
+ scannerId = server.openScanner(regions[currentRegion].regionInfo.regionName,
columns, startRow);
} catch(IOException e) {
@@ -508,26 +556,63 @@
if(closed) {
return false;
}
- boolean status = scanner.next(key, results);
- if(! status) {
- status = nextScanner();
- if(status) {
- status = scanner.next(key, results);
- }
+ LabelledData[] values = null;
+ do {
+ values = server.next(scannerId, key);
+ } while(values.length == 0 && nextScanner());
+
+ for(int i = 0; i < values.length; i++) {
+ results.put(values[i].getLabel(), values[i].getData().get());
}
- return status;
+ return values.length != 0;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HScannerInterface#close()
*/
public void close() throws IOException {
- if(scanner != null) {
- scanner.close();
+ if(scannerId != -1L) {
+ server.close(scannerId);
}
server = null;
closed = true;
}
}
-
-}
+
+ private void printUsage() {
+ System.err.println("Usage: java " + this.getClass().getName() +
+ " [--master=hostname:port]");
+ }
+
+ private int doCommandLine(final String args[]) {
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ for (String cmd: args) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage();
+ return 0;
+ }
+
+ final String masterArgKey = "--master=";
+ if (cmd.startsWith(masterArgKey)) {
+ this.conf.set(MASTER_ADDRESS,
+ cmd.substring(masterArgKey.length()));
+ }
+ }
+
+ int errCode = -1;
+ try {
+ locateRootRegion();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return errCode;
+ }
+
+ public static void main(final String args[]) {
+ Configuration c = new HBaseConfiguration();
+ int errCode = (new HClient(c)).doCommandLine(args);
+ System.exit(errCode);
+ }
+}
\ No newline at end of file
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (.../dev) (revision 7443)
@@ -15,13 +15,15 @@
*/
package org.apache.hadoop.hbase;
-import java.io.*;
+import java.io.IOException;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
/*******************************************************************************
* HRegionServers interact with the HMasterRegionInterface to report on local
* goings-on and to obtain data-handling instructions from the HMaster.
*********************************************/
-public interface HMasterRegionInterface {
- public static final long versionId = 1L;
+public interface HMasterRegionInterface extends VersionedProtocol {
+ public static final long versionID = 1L;
public void regionServerStartup(HServerInfo info) throws IOException;
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException;
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (.../dev) (revision 7443)
@@ -353,4 +353,4 @@
insideCacheFlush = false;
notifyAll();
}
-}
+}
\ No newline at end of file
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (.../dev) (revision 7443)
@@ -312,7 +312,7 @@
for(Iterator it = this.regionInfo.tableDesc.families().iterator();
it.hasNext(); ) {
- Text colFamily = it.next();
+ Text colFamily = HStoreKey.extractFamily(it.next());
stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily,
this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
}
@@ -345,6 +345,7 @@
/** Closes and deletes this HRegion. Called when doing a table deletion, for example */
public void closeAndDelete() throws IOException {
+ LOG.info("deleting region: " + regionInfo.regionName);
close();
fs.delete(regiondir);
}
@@ -808,8 +809,7 @@
// Make sure this is a valid row and valid column
checkRow(row);
- Text colFamily = HStoreKey.extractFamily(column);
- checkFamily(colFamily);
+ checkColumn(column);
// Obtain the row-lock
@@ -911,7 +911,7 @@
/**
* Put a cell value into the locked row. The user indicates the row-lock, the
* target column, and the desired value. This stuff is set into a temporary
- * memory area until the user commits the change, at which pointit's logged
+ * memory area until the user commits the change, at which point it's logged
* and placed into the memcache.
*
* This method really just tests the input, then calls an internal localput()
@@ -950,9 +950,11 @@
* (Or until the user's write-lock expires.)
*/
void localput(long lockid, Text targetCol, byte[] val) throws IOException {
+ checkColumn(targetCol);
+
Text row = getRowFromLock(lockid);
if(row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ throw new LockException("No write lock for lockid " + lockid);
}
// This sync block makes localput() thread-safe when multiple
@@ -965,7 +967,7 @@
// hasn't aborted/committed the write-operation.
if(row != getRowFromLock(lockid)) {
- throw new IOException("Locking error: put operation on lock " + lockid
+ throw new LockException("Locking error: put operation on lock " + lockid
+ " unexpected aborted by another thread");
}
@@ -986,7 +988,7 @@
public void abort(long lockid) throws IOException {
Text row = getRowFromLock(lockid);
if(row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ throw new LockException("No write lock for lockid " + lockid);
}
// This sync block makes abort() thread-safe when multiple
@@ -999,7 +1001,7 @@
// hasn't aborted/committed the write-operation.
if(row != getRowFromLock(lockid)) {
- throw new IOException("Locking error: abort() operation on lock "
+ throw new LockException("Locking error: abort() operation on lock "
+ lockid + " unexpected aborted by another thread");
}
@@ -1022,7 +1024,7 @@
Text row = getRowFromLock(lockid);
if(row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ throw new LockException("No write lock for lockid " + lockid);
}
// This check makes sure that another thread from the client
@@ -1066,9 +1068,10 @@
+ "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
}
}
-
+
/** Make sure this is a valid column for the current table */
- void checkFamily(Text family) throws IOException {
+ void checkColumn(Text columnName) throws IOException {
+ Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
if(! regionInfo.tableDesc.hasFamily(family)) {
throw new IOException("Requested column family " + family
+ " does not exist in HRegion " + regionInfo.regionName
@@ -1092,6 +1095,8 @@
* which maybe we'll do in the future.
*/
long obtainLock(Text row) throws IOException {
+ checkRow(row);
+
synchronized(rowsToLocks) {
while(rowsToLocks.get(row) != null) {
try {
@@ -1109,6 +1114,8 @@
}
Text getRowFromLock(long lockid) throws IOException {
+ // Pattern is that all access to rowsToLocks and/or to
+ // locksToRows is via a lock on rowsToLocks.
synchronized(rowsToLocks) {
return locksToRows.get(lockid);
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (.../vendor/current) (revision 7443)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (.../dev) (revision 7443)
@@ -32,7 +32,7 @@
}
public LabelledData(Text label, byte[] data) {
- this.label.set(label);
+ this.label = new Text(label);
this.data = new BytesWritable(data);
}
@@ -40,7 +40,7 @@
return label;
}
- public BytesWritable getDat() {
+ public BytesWritable getData() {
return data;
}
Property changes on: .
___________________________________________________________________
Name: svk:merge
+ 8368d6a0-ff38-4065-b5b8-cf688d7fb560:/thirdparty/hadoop/vendor/current:6459