();
try {
for(int i = 0; i < logfiles.length; i++) {
SequenceFile.Reader in =
@@ -116,6 +116,9 @@
if (w == null) {
Path logfile = new Path(HStoreFile.getHRegionDir(rootDir,
regionName), HREGION_OLDLOGFILE_NAME);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getting new log file writer for path " + logfile);
+ }
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
HLogEdit.class);
logWriters.put(regionName, w);
@@ -205,9 +208,7 @@
// continue;
}
}
-
-
// Close the current writer (if any), and grab a new one.
if(writer != null) {
writer.close();
@@ -280,7 +281,6 @@
* @throws IOException
*/
synchronized void closeAndDelete() throws IOException {
- rollWriter();
close();
fs.delete(dir);
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 558872)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy)
@@ -23,9 +23,9 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -35,10 +35,10 @@
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -106,11 +106,7 @@
static final Text METACOLUMNS[] = {
COLUMN_FAMILY
};
-
- boolean rootScanned;
- int numMetaRegions;
-
/**
* Base HRegion scanner class. Holds utilty common to ROOT and
* META HRegion scanners.
@@ -152,23 +148,40 @@
* The META scanner needs to wake up:
*
* - when a
META region comes on line
- * periodically to rescan the known META regions
+ * periodically to rescan the online META regions
*
*
- * A META region is not 'known' until it has been scanned
+ *
A META region is not 'online' until it has been scanned
* once.
*/
abstract class BaseScanner implements Runnable {
private final Text FIRST_ROW = new Text();
+ protected boolean rootRegion;
+ protected abstract void initialScan();
+ protected abstract void maintenanceScan();
+
/**
+ * {@inheritDoc}
+ */
+ public void run() {
+ initialScan();
+ while (waitForRootRegionOrClose()) {
+ try {
+ Thread.sleep(metaRescanInterval);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ maintenanceScan();
+ }
+ LOG.info(this.getClass().getSimpleName() + " exiting");
+ }
+
+ /**
* @param region Region to scan
- * @return True if scan completed.
* @throws IOException
*/
- protected boolean scanRegion(final MetaRegion region)
- throws IOException {
- boolean scannedRegion = false;
+ protected void scanRegion(final MetaRegion region) throws IOException {
HRegionInterface regionServer = null;
long scannerId = -1L;
if (LOG.isDebugEnabled()) {
@@ -181,6 +194,7 @@
scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
FIRST_ROW, System.currentTimeMillis(), null);
+ int numberOfRegionsFound = 0;
while (true) {
TreeMap results = new TreeMap();
KeyedData[] values = regionServer.next(scannerId);
@@ -204,21 +218,30 @@
// Note Region has been assigned.
checkAssigned(info, serverName, startCode);
- scannedRegion = true;
+
+ numberOfRegionsFound += 1;
}
+ if(rootRegion) {
+ numberOfMetaRegions.set(numberOfRegionsFound);
+ }
- } catch (UnknownScannerException e) {
- // Reset scannerId so we do not try closing a scanner the other side
- // has lost account of: prevents duplicated stack trace out of the
- // below close in the finally.
- scannerId = -1L;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+
+ if (e instanceof UnknownScannerException) {
+ // Reset scannerId so we do not try closing a scanner the other side
+ // has lost account of: prevents duplicated stack trace out of the
+ // below close in the finally.
+ scannerId = -1L;
+ }
+ }
+ throw e;
} finally {
try {
- if (scannerId != -1L) {
- if (regionServer != null) {
+ if (scannerId != -1L && regionServer != null) {
regionServer.close(scannerId);
- }
}
} catch (IOException e) {
if (e instanceof RemoteException) {
@@ -231,7 +254,6 @@
LOG.debug(Thread.currentThread().getName() + " scan of meta region " +
region.regionName + " complete");
}
- return scannedRegion;
}
protected void checkAssigned(final HRegionInfo info,
@@ -286,69 +308,74 @@
}
}
+ volatile boolean rootScanned;
+
/**
* Scanner for the ROOT HRegion.
*/
class RootScanner extends BaseScanner {
- /**
- * {@inheritDoc}
- */
- public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running ROOT scanner");
- }
+ /** Constructor */
+ public RootScanner() {
+ rootRegion = true;
+ }
+
+ private void scanRoot() {
int tries = 0;
- while(!closed && tries < numRetries) {
+ while (waitForRootRegionOrClose()) {
+ // rootRegionLocation will be filled in when we get an 'open region'
+ // regionServerReport message from the HRegionServer that has been
+ // allocated the ROOT region below.
try {
- // rootRegionLocation will be filled in when we get an 'open region'
- // regionServerReport message from the HRegionServer that has been
- // allocated the ROOT region below. If we get back false, then
- // HMaster has closed.
- if (waitForRootRegionOrClose()) {
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // continue
+ }
+
+ if (tries > 0) {
+ try {
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException e) {
continue;
}
+ }
+ try {
synchronized(rootScannerLock) { // Don't interrupt us while we're working
- rootScanned = false;
- // Make a MetaRegion instance for ROOT region to pass scanRegion.
- MetaRegion mr = new MetaRegion();
- mr.regionName = HGlobals.rootRegionInfo.regionName;
- mr.server = HMaster.this.rootRegionLocation;
- mr.startKey = null;
- if (scanRegion(mr)) {
- numMetaRegions += 1;
- }
- rootScanned = true;
+ scanRegion(new MetaRegion(HMaster.this.rootRegionLocation,
+ HGlobals.rootRegionInfo.regionName, null));
}
- tries = 0;
+ break;
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
+
} catch (IOException ex) {
- LOG.warn(ex);
+ e = ex;
}
}
- tries++;
- if(tries < numRetries) {
- LOG.warn("ROOT scanner", e);
-
- } else {
- LOG.error("ROOT scanner", e);
- closed = true;
- break;
+ tries += 1;
+ if (tries == 1) {
+ LOG.warn(e);
+ continue; // retry
+
}
+ LOG.error(e);
+ continue;
}
- try {
- Thread.sleep(metaRescanInterval);
- } catch(InterruptedException e) {
- // Catch and go around again. If interrupt, its spurious or we're
- // being shutdown. Go back up to the while test.
- }
- }
- LOG.info("ROOT scanner exiting");
+ }
}
+
+ @Override
+ protected void initialScan() {
+ scanRoot();
+ rootScanned = true;
+ }
+
+ @Override
+ protected void maintenanceScan() {
+ scanRoot();
+ }
}
private RootScanner rootScanner;
@@ -360,6 +387,12 @@
HServerAddress server;
Text regionName;
Text startKey;
+
+ MetaRegion(HServerAddress server, Text regionName, Text startKey) {
+ this.server = server;
+ this.regionName = regionName;
+ this.startKey = startKey;
+ }
/**
* {@inheritDoc}
@@ -395,14 +428,19 @@
}
}
+
+ /** Set by root scanner to indicate the number of meta regions */
+ volatile AtomicInteger numberOfMetaRegions;
/** Work for the meta scanner is queued up here */
- Vector metaRegionsToScan;
+ BlockingQueue metaRegionsToScan;
- SortedMap knownMetaRegions;
+ /** These are the online meta regions */
+ SortedMap onlineMetaRegions;
+
+ /** Set by meta scanner after initial scan */
+ volatile boolean initialMetaScanComplete;
- boolean allMetaRegionsScanned;
-
/**
* MetaScanner META table.
*
@@ -413,120 +451,109 @@
* action would prevent other work from getting done.
*/
class MetaScanner extends BaseScanner {
- /**
- * {@inheritDoc}
- */
- @SuppressWarnings("null")
- public void run() {
- while (!closed) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running META scanner");
- }
- MetaRegion region = null;
- while (region == null && !closed) {
- synchronized (metaRegionsToScan) {
- if (metaRegionsToScan.size() != 0) {
- region = metaRegionsToScan.remove(0);
- }
- if (region == null) {
- try {
- metaRegionsToScan.wait(threadWakeFrequency);
- } catch (InterruptedException e) {
- // Catch and go around again. We've been woken because there
- // are new meta regions available or because we are being
- // shut down.
- }
- }
+ /** Constructor */
+ public MetaScanner() {
+ rootRegion = false;
+ }
+
+ private void scanOneMetaRegion(MetaRegion region) {
+ int tries = 0;
+ while (waitForRootRegionOrClose()) {
+ if (tries > 0) {
+ try {
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ continue;
}
}
- if (closed) {
- continue;
- }
try {
- synchronized(metaScannerLock) { // Don't interrupt us while we're working
+ synchronized (metaScannerLock) {
+ // Don't interrupt us while we're working
scanRegion(region);
- knownMetaRegions.put(region.startKey, region);
- if (rootScanned && knownMetaRegions.size() == numMetaRegions) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("all meta regions scanned");
- }
- allMetaRegionsScanned = true;
- metaRegionsScanned();
- }
+ onlineMetaRegions.put(region.startKey, region);
}
+ break;
- int tries = 0;
- do {
- try {
- Thread.sleep(metaRescanInterval);
- } catch(InterruptedException ex) {
- // Catch and go around again.
- }
- if(!allMetaRegionsScanned // A meta region must have split
- || closed) { // We're shutting down
- break;
- }
-
- try {
-
- // Rescan the known meta regions every so often
- synchronized(metaScannerLock) { // Don't interrupt us while we're working
- Vector v = new Vector();
- v.addAll(knownMetaRegions.values());
- for(Iterator i = v.iterator(); i.hasNext(); ) {
- scanRegion(i.next());
- }
- }
- tries = 0;
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- }
- tries++;
- if(tries < numRetries) {
- LOG.warn("META scanner", e);
-
- } else {
- throw e;
- }
- }
- } while(true);
-
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
+
} catch (IOException ex) {
- LOG.warn(ex);
+ e = ex;
}
}
- LOG.error("META scanner", e);
- closed = true;
+ tries += 1;
+ if (tries == 1) {
+ LOG.warn(e);
+ continue;
+
+ }
+ LOG.error(e);
+ continue;
}
}
- LOG.info("META scanner exiting");
}
+ @Override
+ protected void initialScan() {
+ MetaRegion region = null;
+ while (!closed && region == null && !metaRegionsScanned()) {
+ try {
+ region =
+ metaRegionsToScan.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // continue
+ }
+
+ if (region != null) {
+ scanOneMetaRegion(region);
+ }
+ }
+ initialMetaScanComplete = true;
+ }
+
+ @Override
+ protected void maintenanceScan() {
+ ArrayList regions = new ArrayList();
+ regions.addAll(onlineMetaRegions.values());
+ for (MetaRegion r: regions) {
+ scanOneMetaRegion(r);
+ }
+ metaRegionsScanned();
+ }
+
/**
* Called by the meta scanner when it has completed scanning all meta
* regions. This wakes up any threads that were waiting for this to happen.
*/
- private synchronized void metaRegionsScanned() {
+ private synchronized boolean metaRegionsScanned() {
+ if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+ return false;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("all meta regions scanned");
+ }
notifyAll();
+ return true;
}
/**
* Other threads call this method to wait until all the meta regions have
* been scanned.
*/
- synchronized boolean waitForMetaScanOrClose() {
- while(!closed && !allMetaRegionsScanned) {
+ synchronized boolean waitForMetaRegionsOrClose() {
+ while (!closed) {
+ if (rootScanned
+ && numberOfMetaRegions.get() == onlineMetaRegions.size()) {
+
+ break;
+ }
+
try {
wait(threadWakeFrequency);
- } catch(InterruptedException e) {
- // continue
+ } catch (InterruptedException e) {
}
}
return closed;
@@ -664,14 +691,14 @@
// Scans the meta table
- this.numMetaRegions = 0;
- this.metaRegionsToScan = new Vector();
+ this.numberOfMetaRegions = new AtomicInteger();
+ this.metaRegionsToScan = new LinkedBlockingQueue();
- this.knownMetaRegions =
+ this.onlineMetaRegions =
Collections.synchronizedSortedMap(new TreeMap());
- this.allMetaRegionsScanned = false;
-
+ this.initialMetaScanComplete = false;
+
this.metaScanner = new MetaScanner();
this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
@@ -750,7 +777,28 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + op.toString());
}
- op.process();
+ if (!op.process()) {
+ // Operation would have blocked because not all meta regions are
+ // online. This could cause a deadlock, because this thread is waiting
+ // for the missing meta region(s) to come back online, but since it
+ // is waiting, it cannot process the meta region online operation it
+ // is waiting for. So put this operation back on the queue for now.
+
+ if (msgQueue.size() == 0) {
+ // The queue is currently empty so wait for a while to see if what
+ // we need comes in first
+
+ try {
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ }
+ }
+ try {
+ msgQueue.put(op);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+ }
+ }
} catch (Exception ex) {
if (ex instanceof RemoteException) {
@@ -838,21 +886,18 @@
}
/**
- * Wait until rootRegionLocation has been set or until the
+ * Wait until rootScanned has been set or until the
* closed flag has been set.
- * @return True if rootRegionLocation was populated.
+ * @return True if rootScanned and not closed
*/
synchronized boolean waitForRootRegionOrClose() {
- while (!closed && rootRegionLocation == null) {
+ while (!closed && !rootScanned && rootRegionLocation == null) {
try {
wait(threadWakeFrequency);
- } catch(InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wake from wait for root region (or close) (IE)");
- }
+ } catch (InterruptedException e) {
}
}
- return this.rootRegionLocation == null;
+ return !closed;
}
//////////////////////////////////////////////////////////////////////////////
@@ -903,20 +948,26 @@
long serverLabel = getServerLabel(s);
if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
+
// HRegionServer is shutting down. Cancel the server's lease.
+
LOG.debug("Region server " + s + ": MSG_REPORT_EXITING");
cancelLease(s, serverLabel);
- // Get all the regions the server was serving reassigned (if we
- // are not shutting down).
+ // Get all the regions the server was serving reassigned
+ // (if we are not shutting down).
+
if (!closed) {
for (int i = 1; i < msgs.length; i++) {
HRegionInfo info = msgs[i].getRegionInfo();
+
if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) {
rootRegionLocation = null;
+
} else if (info.tableDesc.getName().equals(META_TABLE_NAME)) {
- allMetaRegionsScanned = false;
+ onlineMetaRegions.remove(info.getStartKey());
}
+
unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, Long.valueOf(0L));
}
@@ -931,7 +982,7 @@
// Tell server to shut down if we are shutting down. This should
// happen after check of MSG_REPORT_EXITING above, since region server
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
- return HMsg.MSG_REGIONSERVER_STOP_IN_ARRAY;
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
}
HServerInfo storedInfo = serversToServerInfo.get(s);
@@ -942,11 +993,11 @@
// The HBaseMaster may have been restarted.
// Tell the RegionServer to start over and call regionServerStartup()
- HMsg returnMsgs[] = new HMsg[1];
- returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP);
- return returnMsgs;
- } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) {
+
+ return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)};
+ } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
+
// This state is reachable if:
//
// 1) RegionServer A started
@@ -956,11 +1007,12 @@
//
// The answer is to ask A to shut down for good.
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("region server race condition detected: " + s);
}
- return HMsg.MSG_REGIONSERVER_STOP_IN_ARRAY;
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
+
} else {
// All's well. Renew the server's lease.
@@ -990,7 +1042,7 @@
/** Process all the incoming messages from a server that's contacted us. */
private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException {
- Vector returnMsgs = new Vector();
+ ArrayList returnMsgs = new ArrayList();
TreeMap regionsToKill =
killList.remove(info.getServerAddress().toString());
@@ -1026,11 +1078,6 @@
+ region.regionName);
}
- // Note that it has been assigned and is waiting for the meta table
- // to be updated.
-
- pendingRegions.add(region.regionName);
-
// Remove from unassigned list so we don't assign it to someone else
unassignedRegions.remove(region.regionName);
@@ -1046,22 +1093,13 @@
rootRegionIsAvailable();
break;
-
- } else if(region.tableDesc.getName().equals(META_TABLE_NAME)) {
-
- // It's a meta region. Put it on the queue to be scanned.
-
- MetaRegion r = new MetaRegion();
- r.server = info.getServerAddress();
- r.regionName = region.regionName;
- r.startKey = region.startKey;
-
- synchronized(metaRegionsToScan) {
- metaRegionsToScan.add(r);
- metaRegionsToScan.notifyAll();
- }
}
+ // Note that the table has been assigned and is waiting for the meta
+ // table to be updated.
+
+ pendingRegions.add(region.regionName);
+
// Queue up an update to note the region location.
try {
@@ -1111,17 +1149,29 @@
}
break;
- case HMsg.MSG_NEW_REGION:
+ case HMsg.MSG_REPORT_SPLIT:
+ // A region has split and the old server is serving the two new regions.
+
+ HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
+ HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
+
if(LOG.isDebugEnabled()) {
- LOG.debug("new region " + region.regionName);
+ LOG.debug("region " + region.regionName + " split. New regions are: "
+ + newRegionA.regionName + ", " + newRegionB.regionName);
}
- // A region has split and the old server is serving the two new regions.
-
if(region.tableDesc.getName().equals(META_TABLE_NAME)) {
// A meta region has split.
- allMetaRegionsScanned = false;
+ onlineMetaRegions.remove(region.getStartKey());
+ onlineMetaRegions.put(newRegionA.getStartKey(),
+ new MetaRegion(info.getServerAddress(), newRegionA.getRegionName(),
+ newRegionA.getStartKey()));
+ onlineMetaRegions.put(newRegionB.getStartKey(),
+ new MetaRegion(info.getServerAddress(), newRegionB.getRegionName(),
+ newRegionB.getStartKey()));
+
+ numberOfMetaRegions.incrementAndGet();
}
break;
@@ -1190,13 +1240,10 @@
COLUMN_FAMILY
};
protected final Text startRow = new Text();
- protected long clientId;
- PendingOperation() {
- this.clientId = rand.nextLong();
- }
+ PendingOperation() {}
- abstract void process() throws IOException;
+ abstract boolean process() throws IOException;
}
/**
@@ -1208,6 +1255,9 @@
private HServerAddress deadServer;
private String deadServerName;
private long oldStartCode;
+ private boolean logSplit;
+ private boolean rootChecked;
+ private boolean rootRescanned;
private class ToDoEntry {
boolean deleteRegion;
@@ -1228,13 +1278,16 @@
this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString();
this.oldStartCode = serverInfo.getStartCode();
+ this.logSplit = false;
+ this.rootChecked = false;
+ this.rootRescanned = false;
}
/** Finds regions that the dead region server was serving */
private void scanMetaRegion(HRegionInterface server, long scannerId,
Text regionName) throws IOException {
- Vector toDoList = new Vector();
+ ArrayList toDoList = new ArrayList();
TreeMap regions = new TreeMap();
DataInputBuffer inbuf = new DataInputBuffer();
@@ -1335,7 +1388,7 @@
}
if(info.tableDesc.getName().equals(META_TABLE_NAME)) {
- allMetaRegionsScanned = false;
+ onlineMetaRegions.remove(info.getStartKey());
}
ToDoEntry todo = new ToDoEntry(row, info);
@@ -1383,8 +1436,8 @@
}
// Remove server from root/meta entries
- for(int i = 0; i < toDoList.size(); i++) {
- ToDoEntry e = toDoList.get(i);
+ long clientId = rand.nextLong();
+ for (ToDoEntry e: toDoList) {
long lockid = server.startUpdate(regionName, clientId, e.row);
if(e.deleteRegion) {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
@@ -1400,7 +1453,7 @@
server.delete(regionName, clientId, lockid, COL_STARTCODE);
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
}
-
+
// Get regions reassigned
for(Map.Entry e: regions.entrySet()) {
@@ -1413,69 +1466,89 @@
}
@Override
- void process() throws IOException {
+ boolean process() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("server shutdown: " + deadServerName);
}
-
- // Process the old log file
-
- HLog.splitLog(dir, new Path(dir, "log" + "_" + deadServer.getBindAddress()
- + "_" + deadServer.getPort()), fs, conf);
- if(rootRegionLocation != null
- && deadServerName.equals(rootRegionLocation.toString())) {
+ if(!logSplit) {
+ // Process the old log file
+
+ HLog.splitLog(dir, new Path(dir, "log" + "_" + deadServer.getBindAddress()
+ + "_" + deadServer.getPort()), fs, conf);
- rootRegionLocation = null;
- unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
- HGlobals.rootRegionInfo);
- assignAttempts.put(HGlobals.rootRegionInfo.regionName,
- Long.valueOf(0L));
+ logSplit = true;
}
-
- // Scan the ROOT region
- HRegionInterface server = null;
- long scannerId = -1L;
- for(int tries = 0; tries < numRetries; tries ++) {
- if(waitForRootRegionOrClose()) {// Wait until the root region is available
- return; // We're shutting down. Forget it.
+ if(!rootChecked) {
+ if(rootRegionLocation != null
+ && deadServer.equals(rootRegionLocation)) {
+
+ rootRegionLocation = null;
+ unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
+ HGlobals.rootRegionInfo);
+ assignAttempts.put(HGlobals.rootRegionInfo.regionName,
+ Long.valueOf(0L));
}
- server = client.getHRegionConnection(rootRegionLocation);
- scannerId = -1L;
-
- try {
- LOG.debug("scanning root region");
- scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName,
- columns, startRow, System.currentTimeMillis(), null);
- scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
- break;
-
- } catch (IOException e) {
- if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ rootChecked = true;
+ }
+
+ if(!rootRescanned) {
+ // Scan the ROOT region
+
+ HRegionInterface server = null;
+ long scannerId = -1L;
+ for(int tries = 0; tries < numRetries; tries ++) {
+ if (closed) {
+ return true;
+ }
+ if (rootRegionLocation == null || !rootScanned) {
+ // We can't proceed until the root region is online and has been
+ // scanned
+ return false;
+ }
+ server = client.getHRegionConnection(rootRegionLocation);
+ scannerId = -1L;
+
+ try {
+ LOG.debug("scanning root region");
+ scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName,
+ columns, startRow, System.currentTimeMillis(), null);
+ scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
}
- throw e;
}
}
+ rootRescanned = true;
}
- // We can not scan every meta region if they have not already been assigned
- // and scanned.
-
for(int tries = 0; tries < numRetries; tries ++) {
try {
- if(metaScanner.waitForMetaScanOrClose()) {
- return; // We're shutting down. Forget it.
+ if (closed) {
+ return true;
}
+ if (!rootScanned
+ || numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
+ // We can't proceed because not all of the meta regions are online.
+ // We can't block either because that would prevent the meta region
+ // online message from being processed. So return false to have this
+ // operation requeued.
+
+ return false;
+ }
- for(Iterator i = knownMetaRegions.values().iterator();
- i.hasNext(); ) {
+ for (MetaRegion r: onlineMetaRegions.values()) {
- server = null;
- scannerId = -1L;
- MetaRegion r = i.next();
+ HRegionInterface server = null;
+ long scannerId = -1L;
server = client.getHRegionConnection(r.server);
@@ -1495,6 +1568,7 @@
}
}
}
+ return true;
}
}
@@ -1529,16 +1603,9 @@
}
@Override
- void process() throws IOException {
+ boolean process() throws IOException {
for(int tries = 0; tries < numRetries; tries ++) {
- // We can not access any meta region if they have not already been assigned
- // and scanned.
-
- if(metaScanner.waitForMetaScanOrClose()) {
- return; // We're shutting down. Forget it.
- }
-
if(LOG.isDebugEnabled()) {
LOG.debug("region closed: " + regionInfo.regionName);
}
@@ -1547,28 +1614,49 @@
Text metaRegionName;
HRegionInterface server;
+
+ if (closed) {
+ return true;
+ }
if (rootRegion) {
+ if (rootRegionLocation == null || !rootScanned) {
+ // We can't proceed until the root region is online and has been
+ // scanned
+ return false;
+ }
metaRegionName = HGlobals.rootRegionInfo.regionName;
- if(waitForRootRegionOrClose()) {// Make sure root region available
- return; // We're shutting down. Forget it.
- }
server = client.getHRegionConnection(rootRegionLocation);
+ onlineMetaRegions.remove(regionInfo.getStartKey());
} else {
+ if (!rootScanned
+ || numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
+ // We can't proceed because not all of the meta regions are online.
+ // We can't block either because that would prevent the meta region
+ // online message from being processed. So return false to have this
+ // operation requeued.
+
+ return false;
+ }
+
MetaRegion r = null;
- if(knownMetaRegions.containsKey(regionInfo.regionName)) {
- r = knownMetaRegions.get(regionInfo.regionName);
+ if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
+ r = onlineMetaRegions.get(regionInfo.getRegionName());
} else {
- r = knownMetaRegions.get(
- knownMetaRegions.headMap(regionInfo.regionName).lastKey());
+ r = onlineMetaRegions.get(
+ onlineMetaRegions.headMap(regionInfo.getRegionName()).lastKey());
}
metaRegionName = r.regionName;
server = client.getHRegionConnection(r.server);
}
+ long clientId = rand.nextLong();
try {
- long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
+ long lockid = server.startUpdate(metaRegionName, clientId,
+ regionInfo.regionName);
+
if(deleteRegion) {
server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO);
@@ -1620,6 +1708,7 @@
throw e;
}
}
+ return true;
}
}
@@ -1630,8 +1719,8 @@
*/
private class PendingOpenReport extends PendingOperation {
private boolean rootRegion;
- private Text regionName;
- private byte [] serverAddress;
+ private HRegionInfo region;
+ private HServerAddress serverAddress;
private byte [] startCode;
PendingOpenReport(HServerInfo info, HRegionInfo region) {
@@ -1643,66 +1732,99 @@
// Just an ordinary region. Look for it in the META table.
this.rootRegion = false;
}
- this.regionName = region.regionName;
+ this.region = region;
+ this.serverAddress = info.getServerAddress();
try {
- this.serverAddress = info.getServerAddress().toString().
- getBytes(UTF8_ENCODING);
- this.startCode = String.valueOf(info.getStartCode()).
- getBytes(UTF8_ENCODING);
+ this.startCode =
+ String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING);
} catch(UnsupportedEncodingException e) {
LOG.error(e);
}
}
@Override
- void process() throws IOException {
+ boolean process() throws IOException {
for(int tries = 0; tries < numRetries; tries ++) {
- // We can not access any meta region if they have not already been assigned
- // and scanned.
-
- if(metaScanner.waitForMetaScanOrClose()) {
- return; // We're shutting down. Forget it.
- }
-
if(LOG.isDebugEnabled()) {
- LOG.debug(regionName + " open on "
- + new String(this.serverAddress, UTF8_ENCODING));
+ LOG.debug(region.getRegionName() + " open on "
+ + this.serverAddress.toString());
}
// Register the newly-available Region's location.
Text metaRegionName;
HRegionInterface server;
- if(rootRegion) {
+ if (closed) {
+ return true;
+ }
+ if (rootRegion) {
+ if (rootRegionLocation == null || !rootScanned) {
+ // We can't proceed until the root region is online and has been
+ // scanned
+ return false;
+ }
metaRegionName = HGlobals.rootRegionInfo.regionName;
- if(waitForRootRegionOrClose()) {// Make sure root region available
- return; // We're shutting down. Forget it.
- }
server = client.getHRegionConnection(rootRegionLocation);
} else {
+ if (!rootScanned
+ || numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
+ // We can't proceed because not all of the meta regions are online.
+ // We can't block either because that would prevent the meta region
+ // online message from being processed. So return false to have this
+ // operation requeued.
+
+ return false;
+ }
+
MetaRegion r = null;
- if(knownMetaRegions.containsKey(regionName)) {
- r = knownMetaRegions.get(regionName);
+ if (onlineMetaRegions.containsKey(region.getRegionName())) {
+ r = onlineMetaRegions.get(region.getRegionName());
} else {
- r = knownMetaRegions.get(
- knownMetaRegions.headMap(regionName).lastKey());
+ r = onlineMetaRegions.get(
+ onlineMetaRegions.headMap(region.getRegionName()).lastKey());
}
metaRegionName = r.regionName;
server = client.getHRegionConnection(r.server);
}
if(LOG.isDebugEnabled()) {
- LOG.debug("updating row " + regionName + " in table " + metaRegionName);
+ LOG.debug("updating row " + region.getRegionName() + " in table "
+ + metaRegionName);
}
+ long clientId = rand.nextLong();
try {
- long lockid = server.startUpdate(metaRegionName, clientId, regionName);
- server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
+ long lockid = server.startUpdate(metaRegionName, clientId,
+ region.getRegionName());
+ server.put(metaRegionName, clientId, lockid, COL_SERVER,
+ serverAddress.toString().getBytes(UTF8_ENCODING));
server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
+ if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
+ // It's a meta region.
+
+ MetaRegion m =
+ new MetaRegion(serverAddress, region.regionName, region.startKey);
+
+ if (!initialMetaScanComplete) {
+ // Put it on the queue to be scanned for the first time.
+
+ try {
+ metaRegionsToScan.put(m);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into metaRegionsToScan was interrupted.", e);
+ }
+ } else {
+ // Add it to the online meta regions
+
+ onlineMetaRegions.put(region.startKey, m);
+ }
+ }
break;
} catch (IOException e) {
@@ -1713,8 +1835,9 @@
throw e;
}
}
- pendingRegions.remove(regionName);
+ pendingRegions.remove(region.getRegionName());
}
+ return true;
}
}
@@ -1761,10 +1884,13 @@
try {
// We can not access meta regions if they have not already been
// assigned and scanned. If we timeout waiting, just shutdown.
- if (metaScanner.waitForMetaScanOrClose()) {
- return;
+ if (metaScanner.waitForMetaRegionsOrClose()) {
+ break;
}
- createTable(newRegion);
+ createTable(newRegion);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("created table " + desc.getName());
+ }
break;
} catch (IOException e) {
if(tries == numRetries - 1) {
@@ -1775,10 +1901,6 @@
}
}
}
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("created table " + desc.getName());
- }
}
/*
@@ -1800,9 +1922,9 @@
// table would sit should it exist. Open scanner on it. If a region
// for the table we want to create already exists, then table already
// created. Throw already-exists exception.
- MetaRegion m = (knownMetaRegions.containsKey(newRegion.regionName))?
- knownMetaRegions.get(newRegion.regionName):
- knownMetaRegions.get(knownMetaRegions.
+ MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName)) ?
+ onlineMetaRegions.get(newRegion.regionName) :
+ onlineMetaRegions.get(onlineMetaRegions.
headMap(newRegion.getTableDesc().getName()).lastKey());
Text metaRegionName = m.regionName;
HRegionInterface connection = client.getHRegionConnection(m.server);
@@ -1921,22 +2043,22 @@
// We can not access any meta region if they have not already been
// assigned and scanned.
- if(metaScanner.waitForMetaScanOrClose()) {
- return; // We're shutting down. Forget it.
+ if (metaScanner.waitForMetaRegionsOrClose()) {
+ throw new MasterNotRunningException(); // We're shutting down. Forget it.
}
Text firstMetaRegion = null;
- if(knownMetaRegions.size() == 1) {
- firstMetaRegion = knownMetaRegions.firstKey();
+ if (onlineMetaRegions.size() == 1) {
+ firstMetaRegion = onlineMetaRegions.firstKey();
- } else if(knownMetaRegions.containsKey(tableName)) {
+ } else if (onlineMetaRegions.containsKey(tableName)) {
firstMetaRegion = tableName;
} else {
- firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
+ firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
}
- this.metaRegions.addAll(knownMetaRegions.tailMap(firstMetaRegion).values());
+ this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values());
}
void process() throws IOException {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java (revision 0)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java (working copy)
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+/** Thrown when a table can not be located */
+public class TableNotFoundException extends IOException {
+ private static final long serialVersionUID = 993179627856392526L;
+
+ /** default constructor */
+ public TableNotFoundException() {
+ super();
+ }
+
+ /** @param s message */
+ public TableNotFoundException(String s) {
+ super(s);
+ }
+}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 558872)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy)
@@ -42,10 +42,7 @@
/** Master tells region server to stop */
public static final byte MSG_REGIONSERVER_STOP = 5;
-
- public static final HMsg [] MSG_REGIONSERVER_STOP_IN_ARRAY =
- {new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
-
+
/** Stop serving the specified region and don't report back that it's closed */
public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
@@ -57,10 +54,20 @@
/** region server is no longer serving the specified region */
public static final byte MSG_REPORT_CLOSE = 101;
- /** region server is now serving a region produced by a region split */
- public static final byte MSG_NEW_REGION = 103;
+ /**
+ * region server split the region associated with this message.
+ *
+ * note that this message is immediately followed by two MSG_REPORT_OPEN
+ * messages, one for each of the new regions resulting from the split
+ */
+ public static final byte MSG_REPORT_SPLIT = 103;
- /** region server is shutting down */
+ /**
+ * region server is shutting down
+ *
+ * note that this message is followed by MSG_REPORT_CLOSE messages for each
+ * region the region server was serving.
+ */
public static final byte MSG_REPORT_EXITING = 104;
byte msg;
@@ -108,6 +115,9 @@
return info;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String toString() {
StringBuilder message = new StringBuilder();
@@ -140,8 +150,8 @@
message.append("MSG_REPORT_CLOSE : ");
break;
- case MSG_NEW_REGION:
- message.append("MSG_NEW_REGION : ");
+ case MSG_REPORT_SPLIT:
+ message.append("MSG_REGION_SPLIT : ");
break;
case MSG_REPORT_EXITING:
@@ -162,16 +172,16 @@
// Writable
//////////////////////////////////////////////////////////////////////////////
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ /**
+ * {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeByte(msg);
info.write(out);
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ /**
+ * {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
this.msg = in.readByte();
Index: src/contrib/build.xml
===================================================================
--- src/contrib/build.xml (revision 558872)
+++ src/contrib/build.xml (working copy)
@@ -12,7 +12,7 @@
-
+
@@ -21,7 +21,7 @@
-
+
@@ -31,7 +31,7 @@
-
+