e: storefiles.entrySet()) {
+ HStoreFile curHSF = e.getValue();
+ long size = curHSF.length();
+ if (maxSize == 0L || size > maxSize) {
+ // This is the largest one so far
+ maxSize = size;
+ mapIndex = e.getKey();
+ }
+ if (splitable) {
+ splitable = !curHSF.isReference();
+ }
}
- if (splitable) {
- splitable = !curHSF.isReference();
- }
}
+ if (!splitable) {
+ return null;
+ }
MapFile.Reader r = this.readers.get(mapIndex);
// seek back to the beginning of mapfile
@@ -2035,28 +2040,33 @@
HStoreKey firstKey = new HStoreKey();
HStoreKey lastKey = new HStoreKey();
Writable value = new ImmutableBytesWritable();
- r.next((WritableComparable)firstKey, value);
- r.finalKey((WritableComparable)lastKey);
+ r.next(firstKey, value);
+ r.finalKey(lastKey);
// get the midkey
HStoreKey midkey = (HStoreKey)r.midKey();
if (midkey != null) {
- midKey.set(((HStoreKey)midkey).getRow());
// if the midkey is the same as the first and last keys, then we cannot
// (ever) split this region.
if (midkey.getRow().equals(firstKey.getRow()) &&
- midkey.getRow().equals(lastKey.getRow())) {
- return new HStoreSize(aggregateSize, maxSize, false);
- }
+ midkey.getRow().equals(lastKey.getRow())) {
+ return null;
+ }
+ return midkey.getRow();
}
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeName, e);
} finally {
this.lock.readLock().unlock();
}
- return new HStoreSize(aggregateSize, maxSize, splitable);
+ return null;
}
+
+ /** @return aggregate size of HStore */
+ public long getSize() {
+ return storeSize;
+ }
//////////////////////////////////////////////////////////////////////////////
// File administration
@@ -2083,10 +2093,17 @@
}
}
+ /**
+ * @return the HColumnDescriptor for this store
+ */
+ public HColumnDescriptor getFamily() {
+ return family;
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
- return this.storeName;
+ return this.storeName.toString();
}
/*
@@ -2132,19 +2149,21 @@
throws IOException {
super(timestamp, targetCols);
try {
- this.readers = new MapFile.Reader[storefiles.size()];
-
- // Most recent map file should be first
- int i = readers.length - 1;
- for(HStoreFile curHSF: storefiles.values()) {
- readers[i--] = curHSF.getReader(fs, bloomFilter);
+ synchronized (storefiles) {
+ this.readers = new MapFile.Reader[storefiles.size()];
+
+ // Most recent map file should be first
+ int i = readers.length - 1;
+ for(HStoreFile curHSF: storefiles.values()) {
+ readers[i--] = curHSF.getReader(fs, bloomFilter);
+ }
}
this.keys = new HStoreKey[readers.length];
this.vals = new byte[readers.length][];
// Advance the readers to the first pos.
- for(i = 0; i < readers.length; i++) {
+ for(int i = 0; i < readers.length; i++) {
keys[i] = new HStoreKey();
if(firstRow.getLength() != 0) {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (working copy)
@@ -138,17 +138,16 @@
long currentSize = 0;
HRegion nextRegion = null;
long nextSize = 0;
- Text midKey = new Text();
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion =
new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
- currentSize = currentRegion.largestHStore(midKey).getAggregate();
+ currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
- nextSize = nextRegion.largestHStore(midKey).getAggregate();
+ nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
// We merge two adjacent regions if their total size is less than
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (working copy)
@@ -1,36 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase;
-
-/**
- * Implementors of this interface want to be notified when an HRegion
- * determines that a cache flush is needed. A CacheFlushListener (or null)
- * must be passed to the HRegion constructor.
- */
-public interface CacheFlushListener {
-
- /**
- * Tell the listener the cache needs to be flushed.
- *
- * @param region the HRegion requesting the cache flush
- */
- void flushRequested(HRegion region);
-}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy)
@@ -37,6 +37,7 @@
private Text row;
private Text column;
private long timestamp;
+ private transient Text family;
/** Default constructor used in conjunction with Writable interface */
@@ -89,6 +90,7 @@
this.row = new Text(row);
this.column = new Text(column);
this.timestamp = timestamp;
+ this.family = null;
}
/** @return Approximate size in bytes of this key. */
@@ -120,8 +122,9 @@
*
* @param newcol new column key value
*/
- public void setColumn(Text newcol) {
+ public synchronized void setColumn(Text newcol) {
this.column.set(newcol);
+ this.family = null;
}
/**
@@ -154,6 +157,17 @@
return column;
}
+ /**
+ * @return the column family name (minus the trailing colon)
+ * @throws InvalidColumnNameException
+ */
+ public synchronized Text getFamily() throws InvalidColumnNameException {
+ if (family == null) {
+ family = extractFamily(column);
+ }
+ return family;
+ }
+
/** @return value of timestamp */
public long getTimestamp() {
return timestamp;
@@ -198,8 +212,7 @@
public boolean matchesRowFamily(HStoreKey that)
throws InvalidColumnNameException {
return this.row.compareTo(that.row) == 0 &&
- extractFamily(this.column).
- compareTo(extractFamily(that.getColumn())) == 0;
+ this.getFamily().equals(that.getFamily());
}
/** {@inheritDoc} */
@@ -225,6 +238,7 @@
// Comparable
+ /** {@inheritDoc} */
public int compareTo(Object o) {
HStoreKey other = (HStoreKey)o;
int result = this.row.compareTo(other.row);
@@ -275,7 +289,7 @@
* the result by calling {@link TextSequence#toText()}.
* @throws InvalidColumnNameException
*/
- public static TextSequence extractFamily(final Text col)
+ public static Text extractFamily(final Text col)
throws InvalidColumnNameException {
return extractFamily(col, false);
}
@@ -284,40 +298,41 @@
* Extracts the column family name from a column
* For example, returns 'info' if the specified column was 'info:server'
* @param col name of column
- * @return column famile as a TextSequence based on the passed
- * col. If col is reused, make a new Text of
- * the result by calling {@link TextSequence#toText()}.
+ * @param withColon
+ * @return column family as a Text based on the passed col.
* @throws InvalidColumnNameException
*/
- public static TextSequence extractFamily(final Text col,
- final boolean withColon)
+ public static Text extractFamily(final Text col, final boolean withColon)
throws InvalidColumnNameException {
int offset = getColonOffset(col);
// Include ':' in copy?
- offset += (withColon)? 1: 0;
+ offset += (withColon)? 1 : 0;
if (offset == col.getLength()) {
- return new TextSequence(col);
+ return col;
}
- return new TextSequence(col, 0, offset);
+ Text family = new Text();
+ family.set(col.getBytes(), 0, offset);
+ return family;
}
/**
- * Extracts the column qualifier, the portion that follows the colon (':')
- * family/qualifier separator.
+ * Extracts the column member, the portion that follows the colon (':')
+ * family:member separator.
* For example, returns 'server' if the specified column was 'info:server'
* @param col name of column
- * @return column qualifier as a TextSequence based on the passed
- * col. If col is reused, make a new Text of
- * the result by calling {@link TextSequence#toText()}.
+ * @return column qualifier as a Text based on the passed col.
* @throws InvalidColumnNameException
*/
- public static TextSequence extractQualifier(final Text col)
+ public static Text extractMember(final Text col)
throws InvalidColumnNameException {
- int offset = getColonOffset(col);
- if (offset + 1 == col.getLength()) {
+ int offset = getColonOffset(col) + 1;
+ if (offset == col.getLength()) {
return null;
}
- return new TextSequence(col, offset + 1);
+
+ Text member = new Text();
+ member.set(col.getBytes(), offset, col.getLength() - offset);
+ return member;
}
private static int getColonOffset(final Text col)
@@ -333,7 +348,7 @@
}
if(offset < 0) {
throw new InvalidColumnNameException(col + " is missing the colon " +
- "family/qualifier separator");
+ "family:member separator");
}
return offset;
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy)
@@ -50,24 +50,25 @@
* underlying file is being rolled.
*
*
- * A single HLog is used by several HRegions simultaneously.
+ * A single HLog is used by several HRegion/HStores simultaneously.
*
*
- * Each HRegion is identified by a unique long int. HRegions do
- * not need to declare themselves before using the HLog; they simply include
- * their HRegion-id in the append or
- * completeCacheFlush calls.
+ * Each HStore is identified by a unique string consisting of the encoded
+ * HRegion name and by the HStore family name.
+ * HStores do not need to declare themselves before using the HLog; they simply
+ * include their HStore.storeName in the append
+ * or completeCacheFlush calls.
*
*
* An HLog consists of multiple on-disk files, which have a chronological order.
* As data is flushed to other (better) on-disk structures, the log becomes
- * obsolete. We can destroy all the log messages for a given HRegion-id up to
- * the most-recent CACHEFLUSH message from that HRegion.
+ * obsolete. We can destroy all the log messages for a given HStore up to
+ * the most-recent CACHEFLUSH message from that HStore.
*
*
* It's only practical to delete entire files. Thus, we delete an entire on-disk
* file F when all of the messages in F have a log-sequence-id that's older
- * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
+ * (smaller) than the most-recent CACHEFLUSH message for every HStore that has
* a message in F.
*
*
@@ -77,7 +78,7 @@
* separate reentrant lock is used.
*
*
- * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
+ * TODO: Vuk Ercegovac also pointed out that keeping HBase edit logs in
* HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
* 'atomic' write to the log is meant to serve as insurance against abnormal
* RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
@@ -109,7 +110,7 @@
Collections.synchronizedSortedMap(new TreeMap());
/*
- * Map of region to last sequence/edit id.
+ * Map of store to last sequence/edit id.
*/
final Map lastSeqWritten = new ConcurrentHashMap();
@@ -235,7 +236,7 @@
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.size() <= 0) {
LOG.debug("Last sequence written is empty. Deleting all old hlogs");
- // If so, then no new writes have come in since all regions were
+ // If so, then no new writes have come in since all hstores were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry e : this.outputfiles.entrySet()) {
@@ -248,23 +249,23 @@
Long oldestOutstandingSeqNum =
Collections.min(this.lastSeqWritten.values());
// Get the set of all log files whose final ID is older than or
- // equal to the oldest pending region operation
+ // equal to the oldest pending HStore operation
TreeSet sequenceNumbers =
new TreeSet(this.outputfiles.headMap(
(Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
// Now remove old log files (if any)
if (LOG.isDebugEnabled()) {
- // Find region associated with oldest key -- helps debugging.
- Text oldestRegion = null;
+ // Find HStore associated with oldest key -- helps debugging.
+ Text oldestStore = null;
for (Map.Entry e: this.lastSeqWritten.entrySet()) {
if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
- oldestRegion = e.getKey();
+ oldestStore = e.getKey();
break;
}
}
LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
"using oldest outstanding seqnum of " +
- oldestOutstandingSeqNum + " from region " + oldestRegion);
+ oldestOutstandingSeqNum + " from HStore " + oldestStore);
}
if (sequenceNumbers.size() > 0) {
for (Long seq : sequenceNumbers) {
@@ -326,7 +327,7 @@
}
/**
- * Append a set of edits to the log. Log edits are keyed by regionName,
+ * Append a set of edits to the log. Log edits are keyed by storeName,
* rowname, and log-sequence-id.
*
* Later, if we sort by these keys, we obtain all the relevant edits for a
@@ -342,38 +343,32 @@
* synchronized prevents appends during the completion of a cache flush or for
* the duration of a log roll.
*
- * @param regionName
+ * @param storeName
* @param tableName
- * @param row
- * @param columns
- * @param timestamp
+ * @param key
+ * @param value
* @throws IOException
*/
- void append(Text regionName, Text tableName,
- TreeMap edits) throws IOException {
+ void append(Text storeName, Text tableName, HStoreKey key, byte[] value)
+ throws IOException {
if (closed) {
throw new IOException("Cannot append; log is closed");
}
synchronized (updateLock) {
- long seqNum[] = obtainSeqNum(edits.size());
+ long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
- // write for each region. When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
+ // write for each HStore. When the cache is flushed, the entry for the
+ // store being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
- if (!this.lastSeqWritten.containsKey(regionName)) {
- this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
+ if (!this.lastSeqWritten.containsKey(storeName)) {
+ this.lastSeqWritten.put(storeName, Long.valueOf(seqNum));
}
- int counter = 0;
- for (Map.Entry es : edits.entrySet()) {
- HStoreKey key = es.getKey();
- HLogKey logKey =
- new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
- HLogEdit logEdit =
- new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
- this.writer.append(logKey, logEdit);
- this.numEntries++;
- }
+ HLogKey logKey = new HLogKey(storeName, tableName, key.getRow(), seqNum);
+ HLogEdit logEdit =
+ new HLogEdit(key.getColumn(), value, key.getTimestamp());
+ this.writer.append(logKey, logEdit);
+ this.numEntries++;
}
if (this.numEntries > this.maxlogentries) {
if (listener != null) {
@@ -404,22 +399,6 @@
}
/**
- * Obtain a specified number of sequence numbers
- *
- * @param num number of sequence numbers to obtain
- * @return array of sequence numbers
- */
- private long[] obtainSeqNum(int num) {
- long[] results = new long[num];
- synchronized (this.sequenceLock) {
- for (int i = 0; i < num; i++) {
- results[i] = this.logSeqNum++;
- }
- }
- return results;
- }
-
- /**
* By acquiring a log sequence ID, we can allow log messages to continue while
* we flush the cache.
*
@@ -441,12 +420,12 @@
*
* Protected by cacheFlushLock
*
- * @param regionName
+ * @param storeName
* @param tableName
* @param logSeqId
* @throws IOException
*/
- void completeCacheFlush(final Text regionName, final Text tableName,
+ void completeCacheFlush(final Text storeName, final Text tableName,
final long logSeqId) throws IOException {
try {
@@ -454,13 +433,13 @@
return;
}
synchronized (updateLock) {
- this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+ this.writer.append(new HLogKey(storeName, tableName, HLog.METAROW, logSeqId),
new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
System.currentTimeMillis()));
this.numEntries++;
- Long seq = this.lastSeqWritten.get(regionName);
+ Long seq = this.lastSeqWritten.get(storeName);
if (seq != null && logSeqId >= seq.longValue()) {
- this.lastSeqWritten.remove(regionName);
+ this.lastSeqWritten.remove(storeName);
}
}
} finally {
@@ -489,6 +468,7 @@
* @param conf HBaseConfiguration
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
Configuration conf) throws IOException {
Path logfiles[] = fs.listPaths(new Path[] { srcDir });
@@ -515,13 +495,15 @@
int count = 0;
for (; in.next(key, val); count++) {
Text tableName = key.getTablename();
- Text regionName = key.getRegionName();
+ Text storeName = key.getStoreName();
+ Text regionName = new Text();
+ regionName.set(storeName.getBytes(), 0, storeName.find("/"));
SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) {
Path logfile = new Path(
HRegion.getRegionDir(
HTableDescriptor.getTableDir(rootDir, tableName),
- HRegionInfo.encodeRegionName(regionName)
+ regionName.toString()
),
HREGION_OLDLOGFILE_NAME
);
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy)
@@ -356,6 +356,7 @@
* @return True if still has references to parent.
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
protected boolean hasReferences(final Text metaRegionName,
final HRegionInterface srvr, final Text parent,
SortedMap rowContent, final Text splitColumn)
@@ -1306,8 +1307,7 @@
loadToServers.put(load, servers);
if (!closed.get()) {
- long serverLabel = getServerLabel(s);
- serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
+ serverLeases.createLease(s, new ServerExpirer(s));
}
return createConfigurationSubset();
@@ -1327,15 +1327,10 @@
return mw;
}
- private long getServerLabel(final String s) {
- return s.hashCode();
- }
-
/** {@inheritDoc} */
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
throws IOException {
String serverName = serverInfo.getServerAddress().toString().trim();
- long serverLabel = getServerLabel(serverName);
if (msgs.length > 0) {
if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
synchronized (serversToServerInfo) {
@@ -1348,7 +1343,7 @@
": MSG_REPORT_EXITING -- cancelling lease");
}
- if (cancelLease(serverName, serverLabel)) {
+ if (cancelLease(serverName)) {
// Only process the exit message if the server still has a lease.
// Otherwise we could end up processing the server exit twice.
LOG.info("Region server " + serverName +
@@ -1428,7 +1423,7 @@
}
synchronized (serversToServerInfo) {
- cancelLease(serverName, serverLabel);
+ cancelLease(serverName);
serversToServerInfo.notifyAll();
}
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
@@ -1439,7 +1434,7 @@
// This will always succeed; otherwise, the fetch of serversToServerInfo
// would have failed above.
- serverLeases.renewLease(serverLabel, serverLabel);
+ serverLeases.renewLease(serverName);
// Refresh the info object and the load information
@@ -1476,7 +1471,7 @@
}
/** Cancel a server's lease and update its load information */
- private boolean cancelLease(final String serverName, final long serverLabel) {
+ private boolean cancelLease(final String serverName) {
boolean leaseCancelled = false;
HServerInfo info = serversToServerInfo.remove(serverName);
if (info != null) {
@@ -1487,7 +1482,7 @@
unassignRootRegion();
}
LOG.info("Cancelling lease for " + serverName);
- serverLeases.cancelLease(serverLabel, serverLabel);
+ serverLeases.cancelLease(serverName);
leaseCancelled = true;
// update load information
@@ -3120,20 +3115,20 @@
/*
* Data structure used to return results out of the toRowMap method.
*/
- private class RowMap {
+ class RowMap {
final Text row;
final SortedMap map;
- private RowMap(final Text r, final SortedMap m) {
+ RowMap(final Text r, final SortedMap m) {
this.row = r;
this.map = m;
}
- private Text getRow() {
+ Text getRow() {
return this.row;
}
- private SortedMap getMap() {
+ SortedMap getMap() {
return this.map;
}
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0)
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Implementors of this interface want to be notified when an HRegion
+ * determines that a cache flush is needed. A CacheFlushListener (or null)
+ * must be passed to the HRegion constructor.
+ */
+public interface HStoreListener {
+
+ /**
+ * Tell the listener the cache needs to be flushed.
+ *
+ * @param store the HStore requesting the cache flush
+ * @param region the HRegion the HStore belongs to
+ */
+ void flushRequested(HStore store, HRegion region);
+
+ /**
+ * Tell the listener that the HStore needs compacting.
+ *
+ * @param store the HStore requesting the compaction.
+ * @param region the HRegion that the HStore belongs to
+ */
+ void compactionRequested(HStore store, HRegion region);
+
+ /**
+ * Tell the listener that the region needs to be split
+ *
+ * @param region the HRegion to be split
+ * @param midkey the middle key for the split
+ */
+ void splitRequested(HRegion region, Text midkey);
+}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy)
@@ -69,13 +69,13 @@
private Text col;
ColumnMatcher(final Text col) throws IOException {
- Text qualifier = HStoreKey.extractQualifier(col);
+ Text member = HStoreKey.extractMember(col);
try {
- if(qualifier == null || qualifier.getLength() == 0) {
+ if(member == null || member.getLength() == 0) {
this.matchType = MATCH_TYPE.FAMILY_ONLY;
- this.family = HStoreKey.extractFamily(col).toText();
+ this.family = HStoreKey.extractFamily(col);
this.wildCardmatch = true;
- } else if(isRegexPattern.matcher(qualifier.toString()).matches()) {
+ } else if(isRegexPattern.matcher(member.toString()).matches()) {
this.matchType = MATCH_TYPE.REGEX;
this.columnMatcher = Pattern.compile(col.toString());
this.wildCardmatch = true;
@@ -127,7 +127,7 @@
this.multipleMatchers = false;
this.okCols = new TreeMap>();
for(int i = 0; i < targetCols.length; i++) {
- Text family = HStoreKey.extractFamily(targetCols[i]).toText();
+ Text family = HStoreKey.extractFamily(targetCols[i]);
Vector matchers = okCols.get(family);
if(matchers == null) {
matchers = new Vector();
@@ -158,7 +158,7 @@
boolean columnMatch(int i) throws IOException {
Text column = keys[i].getColumn();
Vector matchers =
- okCols.get(HStoreKey.extractFamily(column));
+ okCols.get(keys[i].getFamily());
if(matchers == null) {
return false;
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -32,7 +34,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -197,8 +198,6 @@
volatile Map> targetColumns =
new ConcurrentHashMap>();
- final AtomicLong memcacheSize = new AtomicLong(0);
-
final Path basedir;
final HLog log;
final FileSystem fs;
@@ -224,15 +223,11 @@
volatile WriteState writestate = new WriteState();
- final int memcacheFlushSize;
- private volatile long lastFlushTime;
- final CacheFlushListener flushListener;
- final int blockingMemcacheSize;
+ final HStoreListener listener;
protected final long threadWakeFrequency;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
private final Integer splitLock = new Integer(0);
- private final long desiredMaxFileSize;
private final long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
@@ -262,7 +257,7 @@
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+ HRegionInfo regionInfo, Path initialFiles, HStoreListener listener)
throws IOException {
this.basedir = basedir;
@@ -288,8 +283,8 @@
for(HColumnDescriptor c :
this.regionInfo.getTableDesc().families().values()) {
- HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs,
- oldLogFile, this.conf);
+ HStore store =
+ new HStore(this.basedir, this, c, this.fs, oldLogFile, this.conf);
stores.put(c.getFamilyName(), store);
@@ -314,20 +309,10 @@
fs.delete(merges);
}
- // By default, we flush the cache when 64M.
- this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
- 1024*1024*64);
- this.flushListener = listener;
- this.blockingMemcacheSize = this.memcacheFlushSize *
- conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
+ this.listener = listener;
- // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
- this.desiredMaxFileSize =
- conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
-
// HRegion is ready to go!
this.writestate.compacting = false;
- this.lastFlushTime = System.currentTimeMillis();
LOG.info("region " + this.regionInfo.getRegionName() + " available");
}
@@ -344,6 +329,18 @@
return this.regionInfo;
}
+ /** @return size of largest HStore */
+ long getLargestHStoreSize() {
+ long size = 0;
+ for (HStore store: stores.values()) {
+ long storeSize = store.getSize();
+ if (storeSize > size) {
+ size = storeSize;
+ }
+ }
+ return size;
+ }
+
/** returns true if region is closed */
boolean isClosed() {
return this.closed.get();
@@ -449,13 +446,12 @@
listener.closing(getRegionName());
}
- // Don't flush the cache if we are aborting
- if (!abort) {
- internalFlushcache(snapshotMemcaches());
- }
-
List result = new ArrayList();
for (HStore store: stores.values()) {
+ if (!abort) { // Don't flush the cache if we are aborting
+ store.snapshotMemcache();
+ internalFlushcache(store);
+ }
result.addAll(store.close());
}
this.closed.set(true);
@@ -523,11 +519,6 @@
return this.fs;
}
- /** @return the last time the region was flushed */
- public long getLastFlushTime() {
- return this.lastFlushTime;
- }
-
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@@ -535,35 +526,6 @@
// upkeep.
//////////////////////////////////////////////////////////////////////////////
- /**
- * @return returns size of largest HStore. Also returns whether store is
- * splitable or not (Its not splitable if region has a store that has a
- * reference store file).
- */
- HStore.HStoreSize largestHStore(Text midkey) {
- HStore.HStoreSize biggest = null;
- boolean splitable = true;
- for(HStore h: stores.values()) {
- HStore.HStoreSize size = h.size(midkey);
- // If we came across a reference down in the store, then propagate
- // fact that region is not splitable.
- if (splitable) {
- splitable = size.splitable;
- }
- if (biggest == null) {
- biggest = size;
- continue;
- }
- if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
- biggest = size;
- }
- }
- if (biggest != null) {
- biggest.setSplitable(splitable);
- }
- return biggest;
- }
-
/*
* Split the HRegion to create two brand-new ones. This also closes
* current HRegion. Split should be fast since we don't rewrite store files
@@ -573,13 +535,13 @@
* @return two brand-new (and open) HRegions or null if a split is not needed
* @throws IOException
*/
- HRegion[] splitRegion(final RegionUnavailableListener listener)
+ HRegion[] splitRegion(final RegionUnavailableListener listener, Text midKey)
throws IOException {
synchronized (splitLock) {
- Text midKey = new Text();
- if (closed.get() || !needsSplit(midKey)) {
+ if (closed.get()) {
return null;
}
+ LOG.info("Starting split of region " + regionInfo.getRegionName());
Path splits = new Path(this.regiondir, SPLITDIR);
if(!this.fs.exists(splits)) {
this.fs.mkdirs(splits);
@@ -650,76 +612,6 @@
}
/*
- * Iterates through all the HStores and finds the one with the largest
- * MapFile size. If the size is greater than the (currently hard-coded)
- * threshold, returns true indicating that the region should be split. The
- * midKey for the largest MapFile is returned through the midKey parameter.
- * It is possible for us to rule the region non-splitable even in excess of
- * configured size. This happens if region contains a reference file. If
- * a reference file, the region can not be split.
- *
- * Note that there is no need to do locking in this method because it calls
- * largestHStore which does the necessary locking.
- *
- * @param midKey midKey of the largest MapFile
- * @return true if the region should be split. midKey is set by this method.
- * Check it for a midKey value on return.
- */
- boolean needsSplit(Text midKey) {
- HStore.HStoreSize biggest = largestHStore(midKey);
- if (biggest == null || midKey.getLength() == 0 ||
- (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) {
- return false;
- }
- long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
- boolean split = (biggest.getAggregate() >= triggerSize);
- if (split) {
- if (!biggest.isSplitable()) {
- LOG.warn("Region " + getRegionName().toString() +
- " is NOT splitable though its aggregate size is " +
- StringUtils.humanReadableInt(biggest.getAggregate()) +
- " and desired size is " +
- StringUtils.humanReadableInt(this.desiredMaxFileSize));
- split = false;
- } else {
- LOG.info("Splitting " + getRegionName().toString() +
- " because largest aggregate size is " +
- StringUtils.humanReadableInt(biggest.getAggregate()) +
- " and desired size is " +
- StringUtils.humanReadableInt(this.desiredMaxFileSize));
- }
- }
- return split;
- }
-
- /**
- * Only do a compaction if it is necessary
- *
- * @return
- * @throws IOException
- */
- boolean compactIfNeeded() throws IOException {
- boolean needsCompaction = false;
- for (HStore store: stores.values()) {
- if (store.needsCompaction()) {
- needsCompaction = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug(store.toString() + " needs compaction");
- }
- break;
- }
- }
- if (!needsCompaction) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("region " + regionInfo.getRegionName() +
- " does not need compaction");
- }
- return false;
- }
- return compactStores();
- }
-
- /*
* @param dir
* @return compaction directory for the passed in dir
*/
@@ -747,24 +639,20 @@
}
/**
- * Compact all the stores. This should be called periodically to make sure
- * the stores are kept manageable.
+ * Compact all the specified HStore
*
* This operation could block for a long time, so don't call it from a
* time-sensitive thread.
*
- * @return Returns TRUE if the compaction has completed. FALSE, if the
- * compaction was not carried out, because the HRegion is busy doing
- * something else storage-intensive (like flushing the cache). The caller
- * should check back later.
+ * @param store the HStore to compact
*
* Note that no locking is necessary at this level because compaction only
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*/
- boolean compactStores() throws IOException {
+ void compactStore(HStore store) throws IOException {
if (this.closed.get()) {
- return false;
+ return;
}
try {
synchronized (writestate) {
@@ -773,28 +661,25 @@
writestate.compacting = true;
} else {
LOG.info("NOT compacting region " +
- this.regionInfo.getRegionName().toString() + ": compacting=" +
+ this.regionInfo.getRegionName().toString() + " store " +
+ store.toString() + ": compacting=" +
writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled + ", writestate.disableCompactions=" +
this.writestate.disableCompactions);
- return false;
+ return;
}
}
long startTime = System.currentTimeMillis();
LOG.info("starting compaction on region " +
- this.regionInfo.getRegionName().toString());
- boolean status = true;
+ this.regionInfo.getRegionName().toString() + " store "
+ + store.toString());
doRegionCompactionPrep();
- for (HStore store : stores.values()) {
- if(!store.compact()) {
- status = false;
- }
- }
+ store.compact();
doRegionCompactionCleanup();
LOG.info("compaction completed on region " +
- this.regionInfo.getRegionName().toString() + ". Took " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
- return status;
+ this.regionInfo.getRegionName().toString() + " store " +
+ store.toString() + ". Took " +
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} finally {
synchronized (writestate) {
@@ -818,15 +703,13 @@
*
This method may block for some time, so it should not be called from a
* time-sensitive thread.
*
- * @return true if cache was flushed
- *
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- boolean flushcache() throws IOException {
+ void flushcache(HStore store) throws IOException {
if (this.closed.get()) {
- return false;
+ return;
}
synchronized (writestate) {
if (!writestate.flushing && writestate.writesEnabled) {
@@ -838,17 +721,16 @@
writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled);
}
- return false;
+ return;
}
}
try {
lock.readLock().lock(); // Prevent splits and closes
try {
- long startTime = -1;
- synchronized (updateLock) {// Stop updates while we snapshot the memcaches
- startTime = snapshotMemcaches();
+ synchronized (updateLock) {// Stop updates while we snapshot the memcache
+ store.snapshotMemcache();
}
- return internalFlushcache(startTime);
+ internalFlushcache(store);
} finally {
lock.readLock().unlock();
}
@@ -860,32 +742,6 @@
}
}
- /*
- * It is assumed that updates are blocked for the duration of this method
- */
- private long snapshotMemcaches() {
- if (this.memcacheSize.get() == 0) {
- return -1;
- }
- long startTime = System.currentTimeMillis();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Started memcache flush for region " +
- this.regionInfo.getRegionName() + ". Size " +
- StringUtils.humanReadableInt(this.memcacheSize.get()));
- }
-
- // We reset the aggregate memcache size here so that subsequent updates
- // will add to the unflushed size
-
- this.memcacheSize.set(0L);
-
- for (HStore hstore: stores.values()) {
- hstore.snapshotMemcache();
- }
- return startTime;
- }
-
/**
* Flushing the cache is a little tricky. We have a lot of updates in the
* HMemcache, all of which have also been written to the log. We need to
@@ -912,18 +768,18 @@
*
*
This method may block for some time.
*
- * @param startTime the time the cache was snapshotted or -1 if a flush is
- * not needed
+ * @param store the HStore whose cache should be flushed
*
- * @return true if the cache was flushed
*
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- private boolean internalFlushcache(long startTime) throws IOException {
- if (startTime == -1) {
- return false;
+ private void internalFlushcache(HStore store) throws IOException {
+ long startTime = System.currentTimeMillis();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Started memcache flush for region " +
+ this.regionInfo.getRegionName() + " store " + store.toString());
}
// We pass the log to the HMemcache, so we can lock down both
@@ -945,13 +801,9 @@
// be part of the current running servers state.
try {
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
+ // A. Flush memcache for specified HStore
- for (HStore hstore: stores.values()) {
- hstore.flushCache(sequenceId);
- }
+ store.flushCache(sequenceId);
} catch (IOException e) {
// An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memcache.
@@ -967,8 +819,8 @@
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
- this.log.completeCacheFlush(this.regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), sequenceId);
+ this.log.completeCacheFlush(store.storeName, getTableDesc().getName(),
+ sequenceId);
// D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@@ -976,12 +828,10 @@
notifyAll();
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Finished memcache flush for region " +
- this.regionInfo.getRegionName() + " in " +
- (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
- sequenceId);
+ LOG.debug("Finished memcache flush for store " + store.storeName +
+ " in " + (System.currentTimeMillis() - startTime) +
+ "ms, sequenceid=" + sequenceId);
}
- return true;
}
//////////////////////////////////////////////////////////////////////////////
@@ -1160,7 +1010,7 @@
throws IOException {
List keys = null;
- Text colFamily = HStoreKey.extractFamily(origin.getColumn());
+ Text colFamily = origin.getFamily();
HStore targetStore = stores.get(colFamily);
if (targetStore != null) {
// Pass versions without modification since in the store getKeys, it
@@ -1223,14 +1073,7 @@
* @param b
* @throws IOException
*/
- public void batchUpdate(long timestamp, BatchUpdate b)
- throws IOException {
- // Do a rough check that we have resources to accept a write. The check is
- // 'rough' in that between the resource check and the call to obtain a
- // read lock, resources may run out. For now, the thought is that this
- // will be extremely rare; we'll deal with it when it happens.
- checkResources();
-
+ public void batchUpdate(long timestamp, BatchUpdate b) throws IOException {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
@@ -1289,42 +1132,6 @@
}
}
- /*
- * Check if resources to support an update.
- *
- * For now, just checks memcache saturation.
- *
- * Here we synchronize on HRegion, a broad scoped lock. Its appropriate
- * given we're figuring in here whether this region is able to take on
- * writes. This is only method with a synchronize (at time of writing),
- * this and the synchronize on 'this' inside in internalFlushCache to send
- * the notify.
- */
- private synchronized void checkResources() {
- boolean blocked = false;
-
- while (this.memcacheSize.get() >= this.blockingMemcacheSize) {
- if (!blocked) {
- LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
- "': Memcache size " +
- StringUtils.humanReadableInt(this.memcacheSize.get()) +
- " is >= than blocking " +
- StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
- }
-
- blocked = true;
- try {
- wait(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue;
- }
- }
- if (blocked) {
- LOG.info("Unblocking updates for region " + getRegionName() + " '" +
- Thread.currentThread().getName() + "'");
- }
- }
-
/**
* Delete all cells of the same age as the passed timestamp or older.
* @param row
@@ -1454,8 +1261,6 @@
/*
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
- * @param row Row to update.
- * @param timestamp Timestamp to record the updates against
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
@@ -1466,21 +1271,13 @@
return;
}
synchronized (updateLock) { // prevent a cache flush
- this.log.append(regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), updatesByColumn);
-
- long size = 0;
for (Map.Entry e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
byte[] val = e.getValue();
- size = this.memcacheSize.addAndGet(key.getSize() +
- (val == null ? 0 : val.length));
- stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
+ HStore store = stores.get(key.getFamily());
+ this.log.append(store.storeName, getTableDesc().getName(), key, val);
+ store.add(key, val);
}
- if (this.flushListener != null && size > this.memcacheFlushSize) {
- // Request a cache flush
- this.flushListener.flushRequested(this);
- }
}
}
@@ -1607,6 +1404,13 @@
return regionInfo.getRegionName().toString();
}
+ /**
+ * @return Immutable list of this region's HStores.
+ */
+ public Collection getStores() {
+ return Collections.unmodifiableCollection(this.stores.values());
+ }
+
private Path getBaseDir() {
return this.basedir;
}
@@ -1767,14 +1571,14 @@
}
} finally {
synchronized (activeScannerCount) {
- int count = activeScannerCount.decrementAndGet();
- if (count < 0) {
- LOG.error("active scanner count less than zero: " + count +
+ int scanners = activeScannerCount.decrementAndGet();
+ if (scanners < 0) {
+ LOG.error("active scanner count less than zero: " + scanners +
" resetting to zero");
activeScannerCount.set(0);
- count = 0;
+ scanners = 0;
}
- if (count == 0) {
+ if (scanners == 0) {
activeScannerCount.notifyAll();
}
}
@@ -1827,7 +1631,6 @@
* @see {@link #removeRegionFromMETA(HRegion, HRegion)}
*/
static void addRegionToMETA(HRegion meta, HRegion r) throws IOException {
- meta.checkResources();
// The row key is the region name
Text row = r.getRegionName();
meta.obtainRowLock(row);
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy)
@@ -21,10 +21,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+
/**
* Leases
*
@@ -39,19 +43,19 @@
* An instance of the Leases class will create a thread to do its dirty work.
* You should close() the instance if you want to clean up the thread properly.
*/
-public class Leases {
- protected static final Log LOG = LogFactory.getLog(Leases.class.getName());
+public class Leases extends Thread {
+ private static final Log LOG = LogFactory.getLog(Leases.class.getName());
+ private final int leasePeriod;
+ private final int leaseCheckFrequency;
+ private volatile DelayQueue leaseQueue = new DelayQueue();
- protected final int leasePeriod;
- protected final int leaseCheckFrequency;
- private final Thread leaseMonitorThread;
- protected final Map leases =
- new HashMap();
- protected final TreeSet sortedLeases = new TreeSet();
- protected AtomicBoolean stop = new AtomicBoolean(false);
+ protected final Map leases = new HashMap();
+ protected final Map listeners =
+ new HashMap();
+ private volatile boolean stopRequested = false;
/**
- * Creates a lease
+ * Creates a lease monitor
*
* @param leasePeriod - length of time (milliseconds) that the lease is valid
* @param leaseCheckFrequency - how often the lease should be checked
@@ -60,22 +64,40 @@
public Leases(final int leasePeriod, final int leaseCheckFrequency) {
this.leasePeriod = leasePeriod;
this.leaseCheckFrequency = leaseCheckFrequency;
- this.leaseMonitorThread =
- new LeaseMonitor(this.leaseCheckFrequency, this.stop);
- this.leaseMonitorThread.setDaemon(true);
}
- /** Starts the lease monitor */
- public void start() {
- leaseMonitorThread.start();
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+ while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
+ Lease lease = null;
+ try {
+ lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException e) {
+ continue;
+
+ } catch (ConcurrentModificationException e) {
+ continue;
+ }
+ if (lease == null) {
+ continue;
+ }
+ // A lease expired
+ LeaseListener listener = null;
+ synchronized (leaseQueue) {
+ String leaseName = lease.getLeaseName();
+ leases.remove(leaseName);
+ listener = listeners.remove(leaseName);
+ if (listener == null) {
+ LOG.error("lease listener is null for lease " + leaseName);
+ continue;
+ }
+ }
+ listener.leaseExpired();
+ }
+ close();
}
-
- /**
- * @param name Set name on the lease checking daemon thread.
- */
- public void setName(final String name) {
- this.leaseMonitorThread.setName(name);
- }
/**
* Shuts down this lease instance when all outstanding leases expire.
@@ -85,20 +107,7 @@
* allocation of new leases.
*/
public void closeAfterLeasesExpire() {
- synchronized(this.leases) {
- while (this.leases.size() > 0) {
- LOG.info(Thread.currentThread().getName() + " " +
- Integer.toString(leases.size()) + " lease(s) " +
- "outstanding. Waiting for them to expire.");
- try {
- this.leases.wait(this.leaseCheckFrequency);
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
- // Now call close since no leases outstanding.
- close();
+ this.stopRequested = true;
}
/**
@@ -107,271 +116,124 @@
*/
public void close() {
LOG.info(Thread.currentThread().getName() + " closing leases");
- this.stop.set(true);
- while (this.leaseMonitorThread.isAlive()) {
- try {
- this.leaseMonitorThread.interrupt();
- this.leaseMonitorThread.join();
- } catch (InterruptedException iex) {
- // Ignore
- }
+ this.stopRequested = true;
+ synchronized (leaseQueue) {
+ leaseQueue.clear();
+ leases.clear();
+ listeners.clear();
+ leaseQueue.notifyAll();
}
- synchronized(leases) {
- synchronized(sortedLeases) {
- leases.clear();
- sortedLeases.clear();
- }
- }
LOG.info(Thread.currentThread().getName() + " closed leases");
}
- /* A client obtains a lease... */
-
/**
* Obtain a lease
*
- * @param holderId id of lease holder
- * @param resourceId id of resource being leased
+ * @param leaseName name of the lease
* @param listener listener that will process lease expirations
*/
- public void createLease(final long holderId, final long resourceId,
- final LeaseListener listener) {
- LeaseName name = null;
- synchronized(leases) {
- synchronized(sortedLeases) {
- Lease lease = new Lease(holderId, resourceId, listener);
- name = lease.getLeaseName();
- if(leases.get(name) != null) {
- throw new AssertionError("Impossible state for createLease(): " +
- "Lease " + name + " is still held.");
- }
- leases.put(name, lease);
- sortedLeases.add(lease);
+ public void createLease(String leaseName, final LeaseListener listener) {
+ if (stopRequested) {
+ return;
+ }
+ Lease lease = new Lease(leaseName, System.currentTimeMillis() + leasePeriod);
+ synchronized (leaseQueue) {
+ if (leases.containsKey(leaseName)) {
+ throw new IllegalStateException("lease '" + leaseName +
+ "' already exists");
}
+ leases.put(leaseName, lease);
+ listeners.put(leaseName, listener);
+ leaseQueue.add(lease);
}
-// if (LOG.isDebugEnabled()) {
-// LOG.debug("Created lease " + name);
-// }
}
- /* A client renews a lease... */
/**
* Renew a lease
*
- * @param holderId id of lease holder
- * @param resourceId id of resource being leased
- * @throws IOException
+ * @param leaseName name of lease
*/
- public void renewLease(final long holderId, final long resourceId)
- throws IOException {
- LeaseName name = null;
- synchronized(leases) {
- synchronized(sortedLeases) {
- name = createLeaseName(holderId, resourceId);
- Lease lease = leases.get(name);
- if (lease == null) {
- // It's possible that someone tries to renew the lease, but
- // it just expired a moment ago. So fail.
- throw new IOException("Cannot renew lease that is not held: " +
- name);
- }
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
+ public void renewLease(final String leaseName) {
+ synchronized (leaseQueue) {
+ Lease lease = leases.get(leaseName);
+ if (lease == null) {
+ throw new IllegalArgumentException("lease '" + leaseName +
+ "' does not exist");
}
+ leaseQueue.remove(lease);
+ lease.setExpirationTime(System.currentTimeMillis() + leasePeriod);
+ leaseQueue.add(lease);
}
-// if (LOG.isDebugEnabled()) {
-// LOG.debug("Renewed lease " + name);
-// }
}
/**
* Client explicitly cancels a lease.
*
- * @param holderId id of lease holder
- * @param resourceId id of resource being leased
+ * @param leaseName name of lease
*/
- public void cancelLease(final long holderId, final long resourceId) {
- LeaseName name = null;
- synchronized(leases) {
- synchronized(sortedLeases) {
- name = createLeaseName(holderId, resourceId);
- Lease lease = leases.get(name);
- if (lease == null) {
- // It's possible that someone tries to renew the lease, but
- // it just expired a moment ago. So just skip it.
- return;
- }
- sortedLeases.remove(lease);
- leases.remove(name);
+ public void cancelLease(final String leaseName) {
+ synchronized (leaseQueue) {
+ Lease lease = leases.remove(leaseName);
+ if (lease == null) {
+ throw new IllegalArgumentException("lease '" + leaseName +
+ "' does not exist");
}
+ leaseQueue.remove(lease);
+ listeners.remove(leaseName);
}
}
- /**
- * LeaseMonitor is a thread that expires Leases that go on too long.
- * Its a daemon thread.
- */
- class LeaseMonitor extends Chore {
- /**
- * @param p
- * @param s
- */
- public LeaseMonitor(int p, AtomicBoolean s) {
- super(p, s);
+ /** This class tracks a single Lease. */
+ private static class Lease implements Delayed {
+ private final String leaseName;
+ private long expirationTime;
+
+ Lease(final String leaseName, long expirationTime) {
+ this.leaseName = leaseName;
+ this.expirationTime = expirationTime;
}
- /** {@inheritDoc} */
- @Override
- protected void chore() {
- synchronized(leases) {
- synchronized(sortedLeases) {
- Lease top;
- while((sortedLeases.size() > 0)
- && ((top = sortedLeases.first()) != null)) {
- if(top.shouldExpire()) {
- leases.remove(top.getLeaseName());
- sortedLeases.remove(top);
- top.expired();
- } else {
- break;
- }
- }
- }
- }
+ /** @return the lease name */
+ public String getLeaseName() {
+ return leaseName;
}
- }
-
- /*
- * A Lease name.
- * More lightweight than String or Text.
- */
- @SuppressWarnings("unchecked")
- class LeaseName implements Comparable {
- private final long holderId;
- private final long resourceId;
-
- LeaseName(final long hid, final long rid) {
- this.holderId = hid;
- this.resourceId = rid;
- }
-
+
/** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
- LeaseName other = (LeaseName)obj;
- return this.holderId == other.holderId &&
- this.resourceId == other.resourceId;
+ return this.hashCode() == ((Lease) obj).hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- // Copy OR'ing from javadoc for Long#hashCode.
- int result = (int)(this.holderId ^ (this.holderId >>> 32));
- result ^= (int)(this.resourceId ^ (this.resourceId >>> 32));
- return result;
+ return this.leaseName.hashCode();
}
-
+
/** {@inheritDoc} */
- @Override
- public String toString() {
- return Long.toString(this.holderId) + "/" +
- Long.toString(this.resourceId);
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(this.expirationTime - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
- public int compareTo(Object obj) {
- LeaseName other = (LeaseName)obj;
- if (this.holderId < other.holderId) {
- return -1;
- }
- if (this.holderId > other.holderId) {
- return 1;
- }
- // holderIds are equal
- if (this.resourceId < other.resourceId) {
- return -1;
- }
- if (this.resourceId > other.resourceId) {
- return 1;
- }
- // Objects are equal
- return 0;
- }
- }
-
- /** Create a lease id out of the holder and resource ids. */
- protected LeaseName createLeaseName(final long hid, final long rid) {
- return new LeaseName(hid, rid);
- }
+ public int compareTo(Delayed o) {
+ long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+ o.getDelay(TimeUnit.MILLISECONDS);
- /** This class tracks a single Lease. */
- @SuppressWarnings("unchecked")
- private class Lease implements Comparable {
- final long holderId;
- final long resourceId;
- final LeaseListener listener;
- long lastUpdate;
- private LeaseName leaseId;
+ int value = 0;
+ if (delta > 0) {
+ value = 1;
- Lease(final long holderId, final long resourceId,
- final LeaseListener listener) {
- this.holderId = holderId;
- this.resourceId = resourceId;
- this.listener = listener;
- renew();
- }
-
- synchronized LeaseName getLeaseName() {
- if (this.leaseId == null) {
- this.leaseId = createLeaseName(holderId, resourceId);
+ } else if (delta < 0) {
+ value = -1;
}
- return this.leaseId;
+ return value;
}
-
- boolean shouldExpire() {
- return (System.currentTimeMillis() - lastUpdate > leasePeriod);
- }
-
- void renew() {
- this.lastUpdate = System.currentTimeMillis();
- }
-
- void expired() {
- LOG.info(Thread.currentThread().getName() + " lease expired " +
- getLeaseName());
- listener.leaseExpired();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object obj) {
- return compareTo(obj) == 0;
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- int result = this.getLeaseName().hashCode();
- result ^= this.lastUpdate;
- return result;
- }
-
- //////////////////////////////////////////////////////////////////////////////
- // Comparable
- //////////////////////////////////////////////////////////////////////////////
- /** {@inheritDoc} */
- public int compareTo(Object o) {
- Lease other = (Lease) o;
- if(this.lastUpdate < other.lastUpdate) {
- return -1;
- } else if(this.lastUpdate > other.lastUpdate) {
- return 1;
- } else {
- return this.getLeaseName().compareTo(other.getLeaseName());
- }
+ /** @param expirationTime the expirationTime to set */
+ public void setExpirationTime(long expirationTime) {
+ this.expirationTime = expirationTime;
}
}
}
\ No newline at end of file
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 617261)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy)
@@ -38,8 +38,6 @@
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +69,8 @@
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
-public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+public class HRegionServer implements HConstants, HRegionInterface, Runnable,
+HStoreListener {
static final Log LOG = LogFactory.getLog(HRegionServer.class);
// Set when a report to the master comes back with a message asking us to
@@ -140,7 +139,7 @@
* is registered as a shutdown hook in the HRegionServer constructor and is
* only called when the HRegionServer receives a kill signal.
*/
- class ShutdownThread extends Thread {
+ static class ShutdownThread extends Thread {
private final HRegionServer instance;
/**
@@ -163,61 +162,6 @@
}
- /** Queue entry passed to flusher, compactor and splitter threads */
- class QueueEntry implements Delayed {
- private final HRegion region;
- private long expirationTime;
-
- QueueEntry(HRegion region, long expirationTime) {
- this.region = region;
- this.expirationTime = expirationTime;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object o) {
- QueueEntry other = (QueueEntry) o;
- return this.hashCode() == other.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- return this.region.getRegionInfo().hashCode();
- }
-
- /** {@inheritDoc} */
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.expirationTime - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- public int compareTo(Delayed o) {
- long delta = this.getDelay(TimeUnit.MILLISECONDS) -
- o.getDelay(TimeUnit.MILLISECONDS);
-
- int value = 0;
- if (delta > 0) {
- value = 1;
-
- } else if (delta < 0) {
- value = -1;
- }
- return value;
- }
-
- /** @return the region */
- public HRegion getRegion() {
- return region;
- }
-
- /** @param expirationTime the expirationTime to set */
- public void setExpirationTime(long expirationTime) {
- this.expirationTime = expirationTime;
- }
- }
-
// Check to see if regions should be split
final Splitter splitter;
// Needed at shutdown. On way out, if can get this lock then we are not in
@@ -227,9 +171,44 @@
/** Split regions on request */
class Splitter extends Thread implements RegionUnavailableListener {
- private final BlockingQueue splitQueue =
- new LinkedBlockingQueue();
+ /** Queue entry for splitter thread */
+ private class SplitQueueEntry {
+ private final HRegion region;
+ private final Text midkey;
+ SplitQueueEntry(HRegion region, Text midkey) {
+ this.region = region;
+ this.midkey = midkey;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ SplitQueueEntry other = (SplitQueueEntry) o;
+ return this.hashCode() == other.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return this.region.getRegionInfo().hashCode();
+ }
+
+ /** @return the region */
+ public HRegion getRegion() {
+ return region;
+ }
+
+ /** @return the midkey */
+ public Text getMidKey() {
+ return midkey;
+ }
+ }
+
+ private final BlockingQueue splitQueue =
+ new LinkedBlockingQueue();
+
+ private volatile SplitQueueEntry splitInProgress = null;
private HTable root = null;
private HTable meta = null;
private long startTime;
@@ -241,7 +220,6 @@
/** {@inheritDoc} */
public void closing(final Text regionName) {
- startTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
// Remove region from regions Map and add it to the Map of retiring
@@ -258,6 +236,7 @@
/** {@inheritDoc} */
public void closed(final Text regionName) {
+ startTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
retiringRegions.remove(regionName);
@@ -275,14 +254,17 @@
@Override
public void run() {
while (!stopRequested.get()) {
- QueueEntry e = null;
+ SplitQueueEntry e = null;
try {
e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
+ synchronized (splitQueue) {
+ splitInProgress = e;
+ }
synchronized (splitterLock) { // Don't interrupt us while we're working
- split(e.getRegion());
+ split(e);
}
} catch (InterruptedException ex) {
continue;
@@ -301,21 +283,43 @@
if (!checkFileSystem()) {
break;
}
+ } finally {
+ synchronized (splitQueue) {
+ splitInProgress = null;
+ }
}
}
LOG.info(getName() + " exiting");
}
/**
- * @param e entry indicating which region needs to be split
+ * @param r HRegion to be split
+ * @param midkey midkey of region to be split
*/
- public void splitRequested(QueueEntry e) {
- splitQueue.add(e);
+ public void splitRequested(HRegion r, Text midkey) {
+ SplitQueueEntry e = new SplitQueueEntry(r, midkey);
+ synchronized (splitQueue) {
+ if (splitInProgress != null && splitInProgress.equals(e)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing split request because split is already in progress");
+ }
+ return;
+ }
+ if (splitQueue.contains(e)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing split request because one is already in the queue");
+ }
+ return;
+ }
+ splitQueue.add(e);
+ }
}
- private void split(final HRegion region) throws IOException {
- final HRegionInfo oldRegionInfo = region.getRegionInfo();
- final HRegion[] newRegions = region.splitRegion(this);
+ private void split(final SplitQueueEntry e) throws IOException {
+ final HRegionInfo oldRegionInfo = e.getRegion().getRegionInfo();
+ final HRegion[] newRegions = e.getRegion().splitRegion(this, e.getMidKey());
if (newRegions == null) {
return; // Didn't need to be split
@@ -325,7 +329,7 @@
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
HTable t = null;
- if (region.getRegionInfo().isMetaTable()) {
+ if (e.getRegion().getRegionInfo().isMetaTable()) {
// We need to update the root region
if (this.root == null) {
this.root = new HTable(conf, ROOT_TABLE_NAME);
@@ -376,6 +380,40 @@
}
}
+ /** Queue entry passed to flusher and compactor threads */
+ static class QueueEntry {
+ private final HStore store;
+ private final HRegion region;
+
+ QueueEntry(HStore store, HRegion region) {
+ this.store = store;
+ this.region = region;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ QueueEntry other = (QueueEntry) o;
+ return this.hashCode() == other.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return this.store.getFamily().hashCode();
+ }
+
+ /** @return the store */
+ public HStore getStore() {
+ return store;
+ }
+
+ /** @return the region */
+ public HRegion getRegion() {
+ return region;
+ }
+ }
+
// Compactions
final Compactor compactor;
// Needed during shutdown so we send an interrupt after completion of a
@@ -386,6 +424,8 @@
class Compactor extends Thread {
private final BlockingQueue compactionQueue =
new LinkedBlockingQueue();
+
+ private volatile QueueEntry compactionInProgress = null;
/** constructor */
public Compactor() {
@@ -402,37 +442,71 @@
if (e == null) {
continue;
}
- if (e.getRegion().compactIfNeeded()) {
- splitter.splitRequested(e);
+ synchronized (compactionQueue) {
+ compactionInProgress = e;
}
+ synchronized (compactionLock) {
+ // Don't interrupt us while we're working
+ e.getRegion().compactStore(e.getStore());
+ }
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
- LOG.error("Compaction failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- RemoteExceptionHandler.checkIOException(ex));
+ ex = RemoteExceptionHandler.checkIOException(ex);
+ if (e != null) {
+ LOG.error("Compaction failed for region " +
+ e.getRegion().getRegionName() + " hstore " +
+ e.getStore().toString(), ex);
+ } else {
+ LOG.error("Compaction failed", ex);
+ }
if (!checkFileSystem()) {
break;
}
-
} catch (Exception ex) {
- LOG.error("Compaction failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- ex);
+ if (e != null) {
+ LOG.error("Compaction failed for region for region " +
+ e.getRegion().getRegionName() + " hstore " +
+ e.getStore().toString(), ex);
+ } else {
+ LOG.error("Compaction failed", ex);
+ }
if (!checkFileSystem()) {
break;
}
+ } finally {
+ synchronized (compactionQueue) {
+ compactionInProgress = null;
+ }
}
}
LOG.info(getName() + " exiting");
}
/**
- * @param e QueueEntry for region to be compacted
+ * @param store HStore to compact
+ * @param region HRegion that HStore belongs to.
*/
- public void compactionRequested(QueueEntry e) {
- compactionQueue.add(e);
+ public void compactionRequested(HStore store, HRegion region) {
+ QueueEntry e = new QueueEntry(store, region);
+ synchronized (compactionQueue) {
+ if (compactionInProgress != null && compactionInProgress.equals(e)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing compaction request because one is already in progress");
+ }
+ return;
+ }
+ if (compactionQueue.contains(e)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing compaction because one is already in the queue");
+ }
+ return;
+ }
+ compactionQueue.add(e);
+ }
}
}
@@ -443,18 +517,15 @@
final Integer cacheFlusherLock = new Integer(0);
/** Flush cache upon request */
- class Flusher extends Thread implements CacheFlushListener {
- private final DelayQueue flushQueue =
- new DelayQueue();
+ class Flusher extends Thread {
+ private final BlockingQueue flushQueue =
+ new LinkedBlockingQueue();
+
+ private volatile QueueEntry flushInProgress = null;
- private final long optionalFlushPeriod;
-
/** constructor */
public Flusher() {
super();
- this.optionalFlushPeriod = conf.getLong(
- "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
-
}
/** {@inheritDoc} */
@@ -467,38 +538,13 @@
if (e == null) {
continue;
}
- synchronized(cacheFlusherLock) { // Don't interrupt while we're working
- if (e.getRegion().flushcache()) {
- compactor.compactionRequested(e);
- }
-
- e.setExpirationTime(System.currentTimeMillis() +
- optionalFlushPeriod);
- flushQueue.add(e);
+ synchronized (flushQueue) {
+ flushInProgress = e;
}
-
- // Now insure that all the active regions are in the queue
-
- Set regions = getRegionsToCheck();
- for (HRegion r: regions) {
- e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
- synchronized (flushQueue) {
- if (!flushQueue.contains(e)) {
- flushQueue.add(e);
- }
- }
+ synchronized (cacheFlusherLock) { // Don't interrupt while we're working
+ e.getRegion().flushcache(e.getStore());
}
- // Now make sure that the queue only contains active regions
-
- synchronized (flushQueue) {
- for (Iterator i = flushQueue.iterator(); i.hasNext(); ) {
- e = i.next();
- if (!regions.contains(e.getRegion())) {
- i.remove();
- }
- }
- }
} catch (InterruptedException ex) {
continue;
@@ -517,32 +563,62 @@
HRegionServer.this.stop();
} catch (IOException ex) {
- LOG.error("Cache flush failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- RemoteExceptionHandler.checkIOException(ex));
+ ex = RemoteExceptionHandler.checkIOException(ex);
+ if (e != null) {
+ LOG.error("Cache flush failed for region " +
+ e.getRegion().getRegionName() + " hstore " +
+ e.getStore().toString(), ex);
+ } else {
+ LOG.error("Cache flush failed", ex);
+ }
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
- LOG.error("Cache flush failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- ex);
+ if (e != null) {
+ LOG.error("Cache flush failed for region " +
+ e.getRegion().getRegionName() + " hstore " +
+ e.getStore().toString(), ex);
+ } else {
+ LOG.error("Cache flush failed", ex);
+ }
if (!checkFileSystem()) {
break;
}
+ } finally {
+ synchronized (flushQueue) {
+ flushInProgress = null;
+ }
}
}
flushQueue.clear();
LOG.info(getName() + " exiting");
}
-
- /** {@inheritDoc} */
- public void flushRequested(HRegion region) {
- QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
+
+ /**
+ * Request a cache flush for the specified HStore
+ *
+ * @param store HStore to flush
+ * @param region HRegion the store belongs to
+ */
+ public void flushRequested(HStore store, HRegion region) {
+ QueueEntry e = new QueueEntry(store, region);
+
synchronized (flushQueue) {
+ if (flushInProgress != null && flushInProgress.equals(e)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing flush request because one is already in progress");
+ }
+ return;
+ }
if (flushQueue.contains(e)) {
- flushQueue.remove(e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "not queuing flush request because one is already in queue");
+ }
+ return;
}
flushQueue.add(e);
}
@@ -697,7 +773,7 @@
// It has been way too long since we last reported to the master.
// Commit suicide.
LOG.fatal("unable to report to master for " + (now - lastMsg) +
- " milliseconds - aborting server");
+ " milliseconds - aborting server");
abort();
break;
}
@@ -716,22 +792,22 @@
HMsg msgs[] =
this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
lastMsg = System.currentTimeMillis();
-
+
if (this.quiesced.get() && onlineRegions.size() == 0) {
// We've just told the master we're exiting because we aren't
// serving any regions. So set the stop bit and exit.
LOG.info("Server quiesced and not serving any regions. " +
- "Starting shutdown");
+ "Starting shutdown");
stopRequested.set(true);
continue;
}
-
+
// Queue up the HMaster's instruction stream for processing
boolean restart = false;
for(int i = 0; i < msgs.length && !stopRequested.get() &&
- !restart; i++) {
+ !restart; i++) {
switch(msgs[i].getMsg()) {
-
+
case HMsg.MSG_CALL_SERVER_STARTUP:
LOG.info("Got call server startup message");
// We the MSG_CALL_SERVER_STARTUP on startup but we can also
@@ -762,7 +838,7 @@
restart = true;
} else {
LOG.fatal("file system available check failed. " +
- "Shutting down server.");
+ "Shutting down server.");
}
break;
@@ -770,7 +846,7 @@
LOG.info("Got regionserver stop message");
stopRequested.set(true);
break;
-
+
case HMsg.MSG_REGIONSERVER_QUIESCE:
if (!quiesceRequested) {
LOG.info("Got quiesce server message");
@@ -778,7 +854,7 @@
toDo.put(new ToDoEntry(msgs[i]));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was " +
- "interrupted.", e);
+ "interrupted.", e);
}
quiesceRequested = true;
}
@@ -790,7 +866,7 @@
toDo.put(new ToDoEntry(msgs[i]));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was " +
- "interrupted.", e);
+ "interrupted.", e);
}
if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
@@ -869,7 +945,7 @@
closeAllRegions(); // Don't leave any open file handles
}
LOG.info("aborting server at: " +
- serverInfo.getServerAddress().toString());
+ serverInfo.getServerAddress().toString());
} else {
ArrayList closedRegions = closeAllRegions();
try {
@@ -896,13 +972,13 @@
RemoteExceptionHandler.checkIOException(e));
}
LOG.info("stopping server at: " +
- serverInfo.getServerAddress().toString());
+ serverInfo.getServerAddress().toString());
}
join();
LOG.info(Thread.currentThread().getName() + " exiting");
}
-
+
/*
* Run init. Sets up hlog and starts up all server threads.
* @param c Extra configuration.
@@ -976,10 +1052,7 @@
handler);
Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
- // Leases is not a Thread. Internally it runs a daemon thread. If it gets
- // an unhandled exception, it will just exit.
- this.leases.setName(n + ".leaseChecker");
- this.leases.start();
+ Threads.setDaemonThreadRunning(this.leases, n + ".leaseChecker", handler);
// Put up info server.
int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
if (port >= 0) {
@@ -1053,6 +1126,21 @@
}
}
+ /** {@inheritDoc} */
+ public void flushRequested(HStore store, HRegion region) {
+ cacheFlusher.flushRequested(store, region);
+ }
+
+ /** {@inheritDoc} */
+ public void compactionRequested(HStore store, HRegion region) {
+ compactor.compactionRequested(store, region);
+ }
+
+ /** {@inheritDoc} */
+ public void splitRequested(HRegion region, Text midkey) {
+ splitter.splitRequested(region, midkey);
+ }
+
/*
* Let the master know we're here
* Run initialization using parameters passed us by the master.
@@ -1217,7 +1305,7 @@
HTableDescriptor.getTableDir(rootDir,
regionInfo.getTableDesc().getName()
),
- this.log, this.fs, conf, regionInfo, null, this.cacheFlusher
+ this.log, this.fs, conf, regionInfo, null, this
);
} catch (IOException e) {
@@ -1456,7 +1544,7 @@
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
- this.leases.renewLease(scannerId, scannerId);
+ this.leases.renewLease(scannerName);
// Collect values to be returned here
HbaseMapWritable values = new HbaseMapWritable();
@@ -1518,8 +1606,7 @@
synchronized(scanners) {
scanners.put(scannerName, s);
}
- this.leases.
- createLease(scannerId, scannerId, new ScannerListener(scannerName));
+ this.leases.createLease(scannerName, new ScannerListener(scannerName));
return scannerId;
} catch (IOException e) {
LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
@@ -1543,7 +1630,7 @@
throw new UnknownScannerException(scannerName);
}
s.close();
- this.leases.cancelLease(scannerId, scannerId);
+ this.leases.cancelLease(scannerName);
} catch (IOException e) {
checkFileSystem();
throw e;
@@ -1627,11 +1714,6 @@
return this.requestCount;
}
- /** @return reference to CacheFlushListener */
- public CacheFlushListener getCacheFlushListener() {
- return this.cacheFlusher;
- }
-
/**
* Protected utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return