s: servedRegions.values()) {
for (HRegionInfo i: s) {
- regionsToDelete.add(i.getRegionName());
+ synchronized (serversToServerInfo) {
+ regionsToDelete.add(i.getRegionName());
+ }
}
}
@@ -3005,6 +3055,10 @@
synchronized (serversToServerInfo) {
info = serversToServerInfo.remove(server);
if (info != null) {
+ HServerAddress root = rootRegionLocation.get();
+ if (root != null && root.equals(info.getServerAddress())) {
+ unassignRootRegion();
+ }
String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
@@ -3021,9 +3075,9 @@
// NOTE: If the server was serving the root region, we cannot reassign it
// here because the new server will start serving the root region before
- // the PendingServerShutdown operation has a chance to split the log file.
+ // the ProcessServerShutdown operation has a chance to split the log file.
if (info != null) {
- shutdownQueue.put(new PendingServerShutdown(info));
+ shutdownQueue.put(new ProcessServerShutdown(info));
}
}
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 597932)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy)
@@ -78,7 +78,18 @@
private boolean split;
private Text startKey;
private HTableDescriptor tableDesc;
-
+ private int hashCode;
+
+ private void setHashCode() {
+ int result = this.regionName.hashCode();
+ result ^= Long.valueOf(this.regionId).hashCode();
+ result ^= this.startKey.hashCode();
+ result ^= this.endKey.hashCode();
+ result ^= Boolean.valueOf(this.offLine).hashCode();
+ result ^= this.tableDesc.hashCode();
+ this.hashCode = result;
+ }
+
/** Used to construct the HRegionInfo for the root and first meta regions */
private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
this.regionId = regionId;
@@ -89,6 +100,7 @@
DELIMITER + regionId);
this.split = false;
this.startKey = new Text();
+ setHashCode();
}
/** Default constructor - creates empty object */
@@ -100,6 +112,7 @@
this.split = false;
this.startKey = new Text();
this.tableDesc = new HTableDescriptor();
+ this.hashCode = 0;
}
/**
@@ -152,6 +165,7 @@
}
this.tableDesc = tableDesc;
+ setHashCode();
}
/** @return the endKey */
@@ -232,13 +246,7 @@
*/
@Override
public int hashCode() {
- int result = this.regionName.hashCode();
- result ^= Long.valueOf(this.regionId).hashCode();
- result ^= this.startKey.hashCode();
- result ^= this.endKey.hashCode();
- result ^= Boolean.valueOf(this.offLine).hashCode();
- result ^= this.tableDesc.hashCode();
- return result;
+ return this.hashCode;
}
//
@@ -256,6 +264,7 @@
out.writeBoolean(split);
startKey.write(out);
tableDesc.write(out);
+ out.writeInt(hashCode);
}
/**
@@ -269,6 +278,7 @@
this.split = in.readBoolean();
this.startKey.readFields(in);
this.tableDesc.readFields(in);
+ this.hashCode = in.readInt();
}
//
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java (revision 0)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java (revision 0)
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Mechanism by which the HLog requests a log roll
+ */
+public interface LogRollListener {
+ /** Request that the log be rolled */
+ public void logRollRequested();
+}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 597932)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -90,7 +90,6 @@
static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
- private volatile long noFlushCount = 0;
/**
* Merge two HRegions. They must be available on the current
@@ -159,7 +158,7 @@
// Done
// Construction moves the merge files into place under region.
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
- newRegionDir);
+ newRegionDir, null);
// Get rid of merges directory
@@ -221,9 +220,10 @@
volatile WriteState writestate = new WriteState();
final int memcacheFlushSize;
+ private volatile long lastFlushTime;
+ final CacheFlushListener flushListener;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
- protected final int optionalFlushCount;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
private final long desiredMaxFileSize;
@@ -251,11 +251,13 @@
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
+ * @param listener an object that implements CacheFlushListener or null
*
* @throws IOException
*/
public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles) throws IOException {
+ HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+ throws IOException {
this.rootDir = rootDir;
this.log = log;
@@ -265,8 +267,6 @@
this.encodedRegionName =
HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.optionalFlushCount =
- conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -314,6 +314,7 @@
// By default, we flush the cache when 16M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16);
+ this.flushListener = listener;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
@@ -323,6 +324,7 @@
// HRegion is ready to go!
this.writestate.compacting = false;
+ this.lastFlushTime = System.currentTimeMillis();
LOG.info("region " + this.regionInfo.getRegionName() + " available");
}
@@ -485,6 +487,11 @@
return this.fs;
}
+ /** @return the last time the region was flushed */
+ public long getLastFlushTime() {
+ return this.lastFlushTime;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@@ -598,8 +605,10 @@
// Done!
// Opening the region copies the splits files from the splits directory
// under each region.
- HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
- HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
+ HRegion regionA =
+ new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
+ HRegion regionB =
+ new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
// Cleanup
boolean deleted = fs.delete(splits); // Get rid of splits directory
@@ -751,55 +760,31 @@
}
/**
- * Flush the cache if necessary. This is called periodically to minimize the
- * amount of log processing needed upon startup.
+ * Flush the cache.
*
- * The returned Vector is a list of all the files used by the component
- * HStores. It is a list of HStoreFile objects. If the returned value is
- * NULL, then the flush could not be executed, because the HRegion is busy
- * doing something else storage-intensive. The caller should check back
- * later.
+ * When this method is called the cache will be flushed unless:
+ *
+ * - the cache is empty
+ * - the region is closed.
+ * - a flush is already in progress
+ * - writes are disabled
+ *
*
* This method may block for some time, so it should not be called from a
* time-sensitive thread.
*
- * @param disableFutureWrites indicates that the caller intends to
- * close() the HRegion shortly, so the HRegion should not take on any new and
- * potentially long-lasting disk operations. This flush() should be the final
- * pre-close() disk operation.
+ * @return true if cache was flushed
+ *
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- void flushcache() throws IOException {
+ boolean flushcache() throws IOException {
lock.readLock().lock(); // Prevent splits and closes
try {
if (this.closed.get()) {
- return;
+ return false;
}
- boolean needFlush = false;
- long memcacheSize = this.memcacheSize.get();
- if(memcacheSize > this.memcacheFlushSize) {
- needFlush = true;
- } else if (memcacheSize > 0) {
- if (this.noFlushCount >= this.optionalFlushCount) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- needFlush = true;
- } else {
- // Only increment if something in the cache.
- // Gets zero'd when a flushcache is called.
- this.noFlushCount++;
- }
- }
- if (!needFlush) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cache flush not needed for region " +
- regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
- ", cache flush threshold=" + this.memcacheFlushSize);
- }
- return;
- }
synchronized (writestate) {
if ((!writestate.flushing) && writestate.writesEnabled) {
writestate.flushing = true;
@@ -811,16 +796,15 @@
writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled);
}
- return;
+ return false;
}
}
- this.noFlushCount = 0;
long startTime = -1;
synchronized (updateLock) {// Stop updates while we snapshot the memcaches
startTime = snapshotMemcaches();
}
try {
- internalFlushcache(startTime);
+ return internalFlushcache(startTime);
} finally {
synchronized (writestate) {
writestate.flushing = false;
@@ -835,7 +819,7 @@
/*
* It is assumed that updates are blocked for the duration of this method
*/
- long snapshotMemcaches() {
+ private long snapshotMemcaches() {
if (this.memcacheSize.get() == 0) {
return -1;
}
@@ -883,17 +867,24 @@
* routes.
*
*
This method may block for some time.
+ *
+ * @param startTime the time the cache was snapshotted or -1 if a flush is
+ * not needed
+ *
+ * @return true if the cache was flushed
+ *
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- void internalFlushcache(long startTime) throws IOException {
+ private boolean internalFlushcache(long startTime) throws IOException {
if (startTime == -1) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Not flushing cache: snapshotMemcaches() determined that " +
- "there was nothing to do");
+ LOG.debug("Not flushing cache for region " +
+ regionInfo.getRegionName() +
+ ": snapshotMemcaches() determined that there was nothing to do");
}
- return;
+ return false;
}
// We pass the log to the HMemcache, so we can lock down both
@@ -914,7 +905,6 @@
// Otherwise, the snapshot content while backed up in the hlog, it will not
// be part of the current running servers state.
- long logCacheFlushId = sequenceId;
try {
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
@@ -938,7 +928,7 @@
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), logCacheFlushId);
+ regionInfo.getTableDesc().getName(), sequenceId);
// D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@@ -948,8 +938,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " +
this.regionInfo.getRegionName() + " in " +
- (System.currentTimeMillis() - startTime) + "ms");
+ (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
+ sequenceId);
}
+ return true;
}
//////////////////////////////////////////////////////////////////////////////
@@ -1309,13 +1301,18 @@
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), updatesByColumn);
+ long memcacheSize = 0;
for (Map.Entry e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
byte[] val = e.getValue();
- this.memcacheSize.addAndGet(key.getSize() +
+ memcacheSize = this.memcacheSize.addAndGet(key.getSize() +
(val == null ? 0 : val.length));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
}
+ if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) {
+ // Request a cache flush
+ this.flushListener.flushRequested(this);
+ }
}
}
@@ -1582,8 +1579,8 @@
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(rootDir,
- new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
- fs, conf, info, initialFiles);
+ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+ fs, conf, info, initialFiles, null);
}
/**
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java (revision 597932)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java (working copy)
@@ -31,6 +31,10 @@
private final int period;
private AtomicBoolean stop;
+ /**
+ * @param sleep
+ * @param stop
+ */
public Sleeper(final int sleep, final AtomicBoolean stop) {
this.period = sleep;
this.stop = stop;
@@ -40,7 +44,7 @@
* Sleep for period.
*/
public void sleep() {
- sleep(System.currentTimeMillis());
+ sleep(period);
}
/**