>();
final AtomicLong memcacheSize = new AtomicLong(0);
+ private volatile boolean flushRequested;
final Path basedir;
final HLog log;
@@ -348,7 +349,6 @@
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
private final Integer splitLock = new Integer(0);
- private final long desiredMaxFileSize;
private final long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
@@ -359,6 +359,8 @@
/**
* HRegion constructor.
*
+ * @param basedir qualified path of directory where region should be located,
+ * usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
@@ -366,20 +368,19 @@
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
- * @param basedir qualified path of directory where region should be located,
- * usually the table directory.
* @param fs is the filesystem.
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
- * @param listener an object that implements CacheFlushListener or null
+ * @param flushListener an object that implements CacheFlushListener or null
+ * or null
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
- throws IOException {
- this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null);
+ HRegionInfo regionInfo, Path initialFiles,
+ CacheFlushListener flushListener) throws IOException {
+ this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
}
/**
@@ -399,15 +400,15 @@
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
- * @param listener an object that implements CacheFlushListener or null
+ * @param flushListener an object that implements CacheFlushListener or null
* @param reporter Call on a period so hosting server can report we're
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener,
- final Progressable reporter)
+ HRegionInfo regionInfo, Path initialFiles,
+ CacheFlushListener flushListener, final Progressable reporter)
throws IOException {
this.basedir = basedir;
@@ -415,6 +416,8 @@
this.fs = fs;
this.conf = conf;
this.regionInfo = regionInfo;
+ this.flushListener = flushListener;
+ this.flushRequested = false;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.regiondir = new Path(basedir, this.regionInfo.getEncodedName());
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
@@ -466,20 +469,16 @@
// By default, we flush the cache when 64M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*64);
- this.flushListener = listener;
+
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
- // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
- this.desiredMaxFileSize =
- conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
-
// HRegion is ready to go!
this.writestate.compacting = false;
this.lastFlushTime = System.currentTimeMillis();
LOG.info("region " + this.regionInfo.getRegionName() + " available");
}
-
+
/**
* @return Updates to this region need to have a sequence id that is >= to
* the this number.
@@ -543,7 +542,7 @@
// region.
writestate.writesEnabled = false;
LOG.debug("compactions and cache flushes disabled for region " +
- regionName);
+ regionName);
while (writestate.compacting || writestate.flushing) {
LOG.debug("waiting for" +
(writestate.compacting ? " compaction" : "") +
@@ -617,7 +616,7 @@
}
}
}
-
+
//////////////////////////////////////////////////////////////////////////////
// HRegion accessors
//////////////////////////////////////////////////////////////////////////////
@@ -672,6 +671,11 @@
return this.lastFlushTime;
}
+ /** @param t the lastFlushTime */
+ void setLastFlushTime(long t) {
+ this.lastFlushTime = t;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@@ -679,34 +683,16 @@
// upkeep.
//////////////////////////////////////////////////////////////////////////////
- /**
- * @param midkey
- * @return returns size of largest HStore. Also returns whether store is
- * splitable or not (Its not splitable if region has a store that has a
- * reference store file).
- */
- public HStoreSize largestHStore(Text midkey) {
- HStoreSize biggest = null;
- boolean splitable = true;
+ /** @return returns size of largest HStore. */
+ public long getLargestHStoreSize() {
+ long size = 0;
for (HStore h: stores.values()) {
- HStoreSize size = h.size(midkey);
- // If we came across a reference down in the store, then propagate
- // fact that region is not splitable.
- if (splitable) {
- splitable = size.splitable;
+ long storeSize = h.getSize();
+ if (storeSize > size) {
+ size = storeSize;
}
- if (biggest == null) {
- biggest = size;
- continue;
- }
- if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
- biggest = size;
- }
}
- if (biggest != null) {
- biggest.setSplitable(splitable);
- }
- return biggest;
+ return size;
}
/*
@@ -715,21 +701,17 @@
* but instead create new 'reference' store files that read off the top and
* bottom ranges of parent store files.
* @param listener May be null.
+ * @param midKey key on which to split region
* @return two brand-new (and open) HRegions or null if a split is not needed
* @throws IOException
*/
- HRegion[] splitRegion(final RegionUnavailableListener listener)
- throws IOException {
+ HRegion[] splitRegion(final RegionUnavailableListener listener,
+ final Text midKey) throws IOException {
synchronized (splitLock) {
- Text midKey = new Text();
- if (closed.get() || !needsSplit(midKey)) {
+ if (closed.get()) {
return null;
}
- Path splits = new Path(this.regiondir, SPLITDIR);
- if(!this.fs.exists(splits)) {
- this.fs.mkdirs(splits);
- }
- // Make copies just in case and add start/end key checking: hbase-428.
+ // Add start/end key checking: hbase-428.
Text startKey = new Text(this.regionInfo.getStartKey());
Text endKey = new Text(this.regionInfo.getEndKey());
if (startKey.equals(midKey)) {
@@ -740,6 +722,11 @@
LOG.debug("Endkey and midkey are same, not splitting");
return null;
}
+ LOG.info("Starting split of region " + getRegionName());
+ Path splits = new Path(this.regiondir, SPLITDIR);
+ if(!this.fs.exists(splits)) {
+ this.fs.mkdirs(splits);
+ }
HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
startKey, midKey);
Path dirA = new Path(splits, regionAInfo.getEncodedName());
@@ -806,71 +793,6 @@
}
/*
- * Iterates through all the HStores and finds the one with the largest
- * MapFile size. If the size is greater than the (currently hard-coded)
- * threshold, returns true indicating that the region should be split. The
- * midKey for the largest MapFile is returned through the midKey parameter.
- * It is possible for us to rule the region non-splitable even in excess of
- * configured size. This happens if region contains a reference file. If
- * a reference file, the region can not be split.
- *
- * Note that there is no need to do locking in this method because it calls
- * largestHStore which does the necessary locking.
- *
- * @param midKey midKey of the largest MapFile
- * @return true if the region should be split. midKey is set by this method.
- * Check it for a midKey value on return.
- */
- boolean needsSplit(Text midKey) {
- HStoreSize biggest = largestHStore(midKey);
- if (biggest == null || midKey.getLength() == 0 ||
- (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) {
- return false;
- }
- boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize);
- if (split) {
- if (!biggest.isSplitable()) {
- LOG.warn("Region " + getRegionName().toString() +
- " is NOT splitable though its aggregate size is " +
- StringUtils.humanReadableInt(biggest.getAggregate()) +
- " and desired size is " +
- StringUtils.humanReadableInt(this.desiredMaxFileSize));
- split = false;
- } else {
- LOG.info("Splitting " + getRegionName().toString() +
- " because largest aggregate size is " +
- StringUtils.humanReadableInt(biggest.getAggregate()) +
- " and desired size is " +
- StringUtils.humanReadableInt(this.desiredMaxFileSize));
- }
- }
- return split;
- }
-
- /**
- * Only do a compaction if it is necessary
- *
- * @return whether or not there was a compaction
- * @throws IOException
- */
- public boolean compactIfNeeded() throws IOException {
- boolean needsCompaction = false;
- for (HStore store: stores.values()) {
- if (store.needsCompaction()) {
- needsCompaction = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug(store.toString() + " needs compaction");
- }
- break;
- }
- }
- if (!needsCompaction) {
- return false;
- }
- return compactStores();
- }
-
- /*
* @param dir
* @return compaction directory for the passed in dir
*/
@@ -893,59 +815,53 @@
*/
private void doRegionCompactionCleanup() throws IOException {
if (this.fs.exists(this.regionCompactionDir)) {
- this.fs.delete(this.regionCompactionDir);
+ FileUtil.fullyDelete(this.fs, this.regionCompactionDir);
}
}
-
+
/**
- * Compact all the stores. This should be called periodically to make sure
- * the stores are kept manageable.
+ * Called by compaction thread and after region is opened to compact the
+ * HStores if necessary.
*
* This operation could block for a long time, so don't call it from a
* time-sensitive thread.
*
- * @return Returns TRUE if the compaction has completed. FALSE, if the
- * compaction was not carried out, because the HRegion is busy doing
- * something else storage-intensive (like flushing the cache). The caller
- * should check back later.
- *
* Note that no locking is necessary at this level because compaction only
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
+ * @return mid key if split is needed
* @throws IOException
*/
- public boolean compactStores() throws IOException {
+ public Text compactStores() throws IOException {
+ Text midKey = null;
if (this.closed.get()) {
- return false;
+ return midKey;
}
try {
synchronized (writestate) {
if (!writestate.compacting && writestate.writesEnabled) {
writestate.compacting = true;
} else {
- LOG.info("NOT compacting region " +
- this.regionInfo.getRegionName().toString() + ": compacting=" +
- writestate.compacting + ", writesEnabled=" +
+ LOG.info("NOT compacting region " + getRegionName() +
+ ": compacting=" + writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled);
- return false;
+ return midKey;
}
}
+ LOG.info("starting compaction on region " + getRegionName());
long startTime = System.currentTimeMillis();
- LOG.info("starting compaction on region " +
- this.regionInfo.getRegionName().toString());
- boolean status = true;
doRegionCompactionPrep();
- for (HStore store : stores.values()) {
- if(!store.compact()) {
- status = false;
+ for (HStore store: stores.values()) {
+ Text key = store.compact();
+ if (key != null && midKey == null) {
+ midKey = key;
}
}
doRegionCompactionCleanup();
- LOG.info("compaction completed on region " +
- this.regionInfo.getRegionName().toString() + ". Took " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
- return status;
+ LOG.info("compaction completed on region " + getRegionName() +
+ ". Took " +
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} finally {
synchronized (writestate) {
@@ -953,6 +869,7 @@
writestate.notifyAll();
}
}
+ return midKey;
}
/**
@@ -1030,7 +947,11 @@
// will add to the unflushed size
this.memcacheSize.set(0L);
+ this.flushRequested = false;
+ // Record latest flush time
+ this.lastFlushTime = System.currentTimeMillis();
+
for (HStore hstore: stores.values()) {
hstore.snapshotMemcache();
}
@@ -1121,11 +1042,12 @@
this.log.completeCacheFlush(this.regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), sequenceId);
- // D. Finally notify anyone waiting on memcache to clear:
+ // C. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll();
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " +
this.regionInfo.getRegionName() + " in " +
@@ -1374,8 +1296,8 @@
Text row = b.getRow();
long lockid = obtainRowLock(row);
- long commitTime =
- (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp();
+ long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
+ System.currentTimeMillis() : b.getTimestamp();
try {
List deletes = null;
@@ -1612,9 +1534,11 @@
(val == null ? 0 : val.length));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
}
- if (this.flushListener != null && size > this.memcacheFlushSize) {
+ if (this.flushListener != null && !this.flushRequested &&
+ size > this.memcacheFlushSize) {
// Request a cache flush
this.flushListener.flushRequested(this);
+ this.flushRequested = true;
}
}
}
@@ -1729,9 +1653,21 @@
}
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ return this.hashCode() == ((HRegion)o).hashCode();
+ }
/** {@inheritDoc} */
@Override
+ public int hashCode() {
+ return this.regionInfo.getRegionName().hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
public String toString() {
return regionInfo.getRegionName().toString();
}
@@ -2011,8 +1947,7 @@
* @throws IOException
*/
public static void removeRegionFromMETA(final HRegionInterface srvr,
- final Text metaRegionName, final Text regionName)
- throws IOException {
+ final Text metaRegionName, final Text regionName) throws IOException {
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
}
@@ -2025,8 +1960,7 @@
* @throws IOException
*/
public static void offlineRegionInMETA(final HRegionInterface srvr,
- final Text metaRegionName, final HRegionInfo info)
- throws IOException {
+ final Text metaRegionName, final HRegionInfo info) throws IOException {
BatchUpdate b = new BatchUpdate(info.getRegionName());
info.setOffline(true);
b.put(COL_REGIONINFO, Writables.getBytes(info));
Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 643696)
+++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy)
@@ -128,6 +128,9 @@
}
/**
+ * Called to process the messages sent from the region server to the master
+ * along with the heart beat.
+ *
* @param serverInfo
* @param msgs
* @return messages from master to region server indicating what region
@@ -142,7 +145,7 @@
if (msgs.length > 0) {
if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
processRegionServerExit(serverName, msgs);
- return new HMsg[]{msgs[0]};
+ return new HMsg[0];
} else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
LOG.info("Region server " + serverName + " quiesced");
master.quiescedMetaServers.incrementAndGet();
@@ -157,6 +160,11 @@
}
if (master.shutdownRequested && !master.closed.get()) {
+ if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
+ // Server is already quiesced, but we aren't ready to shut down
+ // return empty response
+ return new HMsg[0];
+ }
// Tell the server to stop serving any user regions
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
}
@@ -522,7 +530,7 @@
public int averageLoad() {
return 0;
}
-
+
/** @return the number of active servers */
public int numServers() {
return serversToServerInfo.size();
Index: src/java/org/apache/hadoop/hbase/util/MetaUtils.java
===================================================================
--- src/java/org/apache/hadoop/hbase/util/MetaUtils.java (revision 643696)
+++ src/java/org/apache/hadoop/hbase/util/MetaUtils.java (working copy)
@@ -35,12 +35,13 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.client.HTable;
/**
@@ -316,13 +317,16 @@
throws IOException {
HTable t = new HTable(c, HConstants.META_TABLE_NAME);
Cell cell = t.get(row, HConstants.COL_REGIONINFO);
+ if (cell == null) {
+ throw new IOException("no information for row " + row);
+ }
// Throws exception if null.
HRegionInfo info = Writables.getHRegionInfo(cell);
- long id = t.startUpdate(row);
+ BatchUpdate b = new BatchUpdate(row);
info.setOffline(onlineOffline);
- t.put(id, HConstants.COL_REGIONINFO, Writables.getBytes(info));
- t.delete(id, HConstants.COL_SERVER);
- t.delete(id, HConstants.COL_STARTCODE);
- t.commit(id);
+ b.put(HConstants.COL_REGIONINFO, Writables.getBytes(info));
+ b.delete(HConstants.COL_SERVER);
+ b.delete(HConstants.COL_STARTCODE);
+ t.commit(b);
}
}