diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 7589db3..260f557 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -144,44 +144,69 @@ class CompactSplitThread extends Thread {
}
}
- private void split(final HRegion region, final byte [] midKey)
+ private void split(final HRegion parent, final byte [] midKey)
throws IOException {
- final HRegionInfo oldRegionInfo = region.getRegionInfo();
+ final HRegionInfo parentRegionInfo = parent.getRegionInfo();
final long startTime = System.currentTimeMillis();
- final HRegion[] newRegions = region.splitRegion(midKey);
+
+
+
+
+
+ final HRegion [] newRegions = parent.splitRegion(midKey);
if (newRegions == null) {
// Didn't need to be split
return;
}
- // When a region is split, the META table needs to updated if we're
- // splitting a 'normal' region, and the ROOT table needs to be
- // updated if we are splitting a META region.
+ // Inform the HRegionServer that the parent HRegion is no-longer online.
+ this.server.removeFromOnlineRegions(parentRegionInfo);
+
HTable t = null;
- if (region.getRegionInfo().isMetaTable()) {
- // We need to update the root region
- if (this.root == null) {
- this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME);
- }
- t = root;
- } else {
- // For normal regions we need to update the meta region
- if (meta == null) {
- meta = new HTable(conf, HConstants.META_TABLE_NAME);
+ try {
+ t = getTable(parent);
+ Put put = createOfflineParentPut(parentRegionInfo, newRegions);
+ t.put(put);
+ // TO RECOVER HERE... NEED TO ADD BACK TO ONINE REGIONS AND RESET!!!!
+
+ // If we crash here, then the daughters will not be added and we'll have
+ // and offlined parent but no daughters to take up the slack. hbase-2244
+ // adds fixup to the metascanners.
+
+ // Add new regions to META
+ for (int i = 0; i < newRegions.length; i++) {
+ put = new Put(newRegions[i].getRegionName());
+ put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(newRegions[i].getRegionInfo()));
+ t.put(put);
}
- t = meta;
+ } catch (IOException e) {
+
+ } finally{
+ t.close();
}
+ // Now tell the master about the new regions. If we fail here, its OK.
+ // Basescanner will do fix up. And reporting split to master is going away.
+ this.server.reportSplit(parentRegionInfo, newRegions[0].getRegionInfo(),
+ newRegions[1].getRegionInfo());
+ LOG.info("Region split, META updated, and report to master. Parent=" +
+ parentRegionInfo.toString() + ", new regions: " + newRegions[0].toString() +
+ ", " + newRegions[1].toString() + ". Split took " +
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+ }
+
+ private Put createOfflineParentPut(final HRegionInfo parentRegionInfo,
+ final HRegion [] newRegions) {
// Mark old region as offline and split in META.
// NOTE: there is no need for retry logic here. HTable does it for us.
- oldRegionInfo.setOffline(true);
- oldRegionInfo.setSplit(true);
- // Inform the HRegionServer that the parent HRegion is no-longer online.
- this.server.removeFromOnlineRegions(oldRegionInfo);
-
- Put put = new Put(oldRegionInfo.getRegionName());
+ HRegionInfo editedParentRegionInfo =
+ new HRegionInfo(parentRegionInfo);
+ editedParentRegionInfo.setOffline(true);
+ editedParentRegionInfo.setSplit(true);
+ Put put = new Put(editedParentRegionInfo.getRegionName());
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(oldRegionInfo));
+ Writables.getBytes(editedParentRegionInfo));
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
HConstants.EMPTY_BYTE_ARRAY);
put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
@@ -190,33 +215,34 @@ class CompactSplitThread extends Thread {
Writables.getBytes(newRegions[0].getRegionInfo()));
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
Writables.getBytes(newRegions[1].getRegionInfo()));
- t.put(put);
-
- // If we crash here, then the daughters will not be added and we'll have
- // and offlined parent but no daughters to take up the slack. hbase-2244
- // adds fixup to the metascanners.
+ return put;
+ }
- // Add new regions to META
- for (int i = 0; i < newRegions.length; i++) {
- put = new Put(newRegions[i].getRegionName());
- put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(newRegions[i].getRegionInfo()));
- t.put(put);
+ /*
+ * @param r Parent region we want to edit.
+ * @return An HTable instance against the meta table that holds passed
+ * r
+ * @throws IOException
+ */
+ private HTable getTable(final HRegion r) throws IOException {
+ // When a region is split, the META table needs to updated if we're
+ // splitting a 'normal' region, and the ROOT table needs to be
+ // updated if we are splitting a META region.
+ HTable t = null;
+ if (r.getRegionInfo().isMetaTable()) {
+ // We need to update the root region
+ if (this.root == null) {
+ this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME);
+ }
+ t = root;
+ } else {
+ // For normal regions we need to update the meta region
+ if (meta == null) {
+ meta = new HTable(conf, HConstants.META_TABLE_NAME);
+ }
+ t = meta;
}
-
- // If we crash here, the master will not know of the new daughters and they
- // will not be assigned. The metascanner when it runs will notice and take
- // care of assigning the new daughters.
-
- // Now tell the master about the new regions
- server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
- newRegions[1].getRegionInfo());
-
- LOG.info("region split, META updated, and report to master all" +
- " successful. Old region=" + oldRegionInfo.toString() +
- ", new regions: " + newRegions[0].toString() + ", " +
- newRegions[1].toString() + ". Split took " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+ return t;
}
/**
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6dc41a4..618d0ec 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -123,7 +123,6 @@ import com.google.common.collect.Lists;
*/
public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
- static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
final AtomicBoolean closed = new AtomicBoolean(false);
@@ -218,7 +217,7 @@ public class HRegion implements HeapSize { // , Writable{
private final long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard splits and closes
- private final ReentrantReadWriteLock splitsAndClosesLock =
+ final ReentrantReadWriteLock splitsAndClosesLock =
new ReentrantReadWriteLock();
private final ReentrantReadWriteLock newScannerLock =
new ReentrantReadWriteLock();
@@ -226,7 +225,6 @@ public class HRegion implements HeapSize { // , Writable{
// Stop updates lock
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
- private final Object splitLock = new Object();
private boolean splitRequest;
private final ReadWriteConsistencyControl rwcc =
@@ -369,7 +367,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param initialFiles
* @throws IOException
*/
- private static void moveInitialFilesIntoPlace(final FileSystem fs,
+ static void moveInitialFilesIntoPlace(final FileSystem fs,
final Path initialFiles, final Path regiondir)
throws IOException {
if (initialFiles != null && fs.exists(initialFiles)) {
@@ -468,70 +466,68 @@ public class HRegion implements HeapSize { // , Writable{
*
* @throws IOException e
*/
- public List close(final boolean abort) throws IOException {
+ public synchronized List close(final boolean abort)
+ throws IOException {
if (isClosed()) {
- LOG.warn("region " + this + " already closed");
+ LOG.warn("Region " + this + " already closed");
return null;
}
- synchronized (splitLock) {
- boolean wasFlushing = false;
- synchronized (writestate) {
- // Disable compacting and flushing by background threads for this
- // region.
- writestate.writesEnabled = false;
- wasFlushing = writestate.flushing;
- LOG.debug("Closing " + this + ": disabling compactions & flushes");
- while (writestate.compacting || writestate.flushing) {
- LOG.debug("waiting for" +
- (writestate.compacting ? " compaction" : "") +
- (writestate.flushing ?
- (writestate.compacting ? "," : "") + " cache flush" :
- "") + " to complete for region " + this);
- try {
- writestate.wait();
- } catch (InterruptedException iex) {
- // continue
- }
+ boolean wasFlushing = false;
+ synchronized (writestate) {
+ // Disable compacting and flushing by background threads for this
+ // region.
+ writestate.writesEnabled = false;
+ wasFlushing = writestate.flushing;
+ LOG.debug("Closing " + this + ": disabling compactions & flushes");
+ while (writestate.compacting || writestate.flushing) {
+ LOG.debug("waiting for" +
+ (writestate.compacting ? " compaction" : "") +
+ (writestate.flushing ?
+ (writestate.compacting ? "," : "") + " cache flush" :
+ "") + " to complete for region " + this);
+ try {
+ writestate.wait();
+ } catch (InterruptedException iex) {
+ // continue
}
}
- // If we were not just flushing, is it worth doing a preflush...one
- // that will clear out of the bulk of the memstore before we put up
- // the close flag?
- if (!abort && !wasFlushing && worthPreFlushing()) {
- LOG.info("Running close preflush of " + this.getRegionNameAsString());
- internalFlushcache();
- }
- newScannerLock.writeLock().lock();
- this.closing.set(true);
+ }
+ // If we were not just flushing, is it worth doing a preflush...one
+ // that will clear out of the bulk of the memstore before we put up
+ // the close flag?
+ if (!abort && !wasFlushing && worthPreFlushing()) {
+ LOG.info("Running close preflush of " + this.getRegionNameAsString());
+ internalFlushcache();
+ }
+ newScannerLock.writeLock().lock();
+ this.closing.set(true);
+ try {
+ splitsAndClosesLock.writeLock().lock();
+ LOG.debug("Updates disabled for region, no outstanding scanners on " + this);
try {
- splitsAndClosesLock.writeLock().lock();
- LOG.debug("Updates disabled for region, no outstanding scanners on " +
- this);
- try {
- // Write lock means no more row locks can be given out. Wait on
- // outstanding row locks to come in before we close so we do not drop
- // outstanding updates.
- waitOnRowLocks();
- LOG.debug("No more row locks outstanding on region " + this);
-
- // Don't flush the cache if we are aborting
- if (!abort) {
- internalFlushcache();
- }
+ // Write lock means no more row locks can be given out. Wait on
+ // outstanding row locks to come in before we close so we do not drop
+ // outstanding updates.
+ waitOnRowLocks();
+ LOG.debug("No more row locks outstanding on region " + this);
+
+ // Don't flush the cache if we are aborting
+ if (!abort) {
+ internalFlushcache();
+ }
- List result = new ArrayList();
- for (Store store: stores.values()) {
- result.addAll(store.close());
- }
- this.closed.set(true);
- LOG.info("Closed " + this);
- return result;
- } finally {
- splitsAndClosesLock.writeLock().unlock();
+ List result = new ArrayList();
+ for (Store store: stores.values()) {
+ result.addAll(store.close());
}
+ this.closed.set(true);
+ LOG.info("Closed " + this);
+ return result;
} finally {
- newScannerLock.writeLock().unlock();
+ splitsAndClosesLock.writeLock().unlock();
}
+ } finally {
+ newScannerLock.writeLock().unlock();
}
}
@@ -621,107 +617,137 @@ public class HRegion implements HeapSize { // , Writable{
return size;
}
- /*
- * Split the HRegion to create two brand-new ones. This also closes
- * current HRegion. Split should be fast since we don't rewrite store files
- * but instead create new 'reference' store files that read off the top and
- * bottom ranges of parent store files.
- * @param splitRow row on which to split region
- * @return two brand-new HRegions or null if a split is not needed
+ /**
+ * Prepare to run the split region "transaction". Runs checks that we can
+ * actually split.
+ * Caller must hold the {@link #splitLock}. If this method fails, no damage
+ * done, can continue.
+ * @param splitRow Row to split on.
+ * @return Prospective daughter HRegionInfos or null if setup failed (this
+ * region is closed, etc).
* @throws IOException
+ * @see #executeSplitRegionTransaction
+ * @see #rollbackSplitRegionTransaction
*/
- HRegion [] splitRegion(final byte [] splitRow) throws IOException {
+ Pair prepareSplitRegionTransaction(final byte [] splitRow)
+ throws IOException {
prepareToSplit();
- synchronized (splitLock) {
- if (closed.get()) {
- return null;
- }
- // Add start/end key checking: hbase-428.
- byte [] startKey = this.regionInfo.getStartKey();
- byte [] endKey = this.regionInfo.getEndKey();
- if (this.comparator.matchingRows(startKey, 0, startKey.length,
- splitRow, 0, splitRow.length)) {
- LOG.debug("Startkey and midkey are same, not splitting");
- return null;
- }
- if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
- endKey, 0, endKey.length)) {
- LOG.debug("Endkey and midkey are same, not splitting");
- return null;
- }
- LOG.info("Starting split of region " + this);
- Path splits = new Path(this.regiondir, SPLITDIR);
- if(!this.fs.exists(splits)) {
- this.fs.mkdirs(splits);
- }
- // Calculate regionid to use. Can't be less than that of parent else
- // it'll insert into wrong location over in .META. table: HBASE-710.
- long rid = EnvironmentEdgeManager.currentTimeMillis();
- if (rid < this.regionInfo.getRegionId()) {
- LOG.warn("Clock skew; parent regions id is " +
- this.regionInfo.getRegionId() + " but current time here is " + rid);
- rid = this.regionInfo.getRegionId() + 1;
- }
- HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
- startKey, splitRow, false, rid);
- Path dirA = getSplitDirForDaughter(splits, regionAInfo);
- HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
- splitRow, endKey, false, rid);
- Path dirB = getSplitDirForDaughter(splits, regionBInfo);
-
- // Now close the HRegion. Close returns all store files or null if not
- // supposed to close (? What to do in this case? Implement abort of close?)
- // Close also does wait on outstanding rows and calls a flush just-in-case.
- List hstoreFilesToSplit = close(false);
- if (hstoreFilesToSplit == null) {
- LOG.warn("Close came back null (Implement abort of close?)");
- throw new RuntimeException("close returned empty vector of HStoreFiles");
- }
+ if (closed.get()) return null;
+ // Add start/end key checking: hbase-428.
+ byte [] startKey = this.regionInfo.getStartKey();
+ byte [] endKey = this.regionInfo.getEndKey();
+ if (this.comparator.matchingRows(startKey, 0, startKey.length,
+ splitRow, 0, splitRow.length)) {
+ LOG.debug("Startkey and midkey are same, not splitting");
+ return null;
+ }
+ if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
+ endKey, 0, endKey.length)) {
+ LOG.debug("Endkey and midkey are same, not splitting");
+ return null;
+ }
+ long rid = getDaughterRegionIdTimestamp(this.regionInfo.getRegionId());
+ HRegionInfo a = new HRegionInfo(this.regionInfo.getTableDesc(),
+ startKey, splitRow, false, rid);
+ HRegionInfo b = new HRegionInfo(this.regionInfo.getTableDesc(),
+ splitRow, endKey, false, rid);
+ return new Pair(a, b);
+ }
+
+ /*
+ * Calculate daughter regionid to use. Regionid is timestamp. Can't be less
+ * than that of parent else will insert at wrong location in .META.
+ * (See HBASE-710).
+ * @param parentid
+ * @return Daughter region id timestamp.
+ */
+ private long getDaughterRegionIdTimestamp(final long parentid) {
+ long rid = EnvironmentEdgeManager.currentTimeMillis();
+ if (rid < this.regionInfo.getRegionId()) {
+ LOG.warn("Clock skew; parent regions id is " +
+ this.regionInfo.getRegionId() + " but current time here is " + rid);
+ rid = this.regionInfo.getRegionId() + 1;
+ }
+ return rid;
+ }
+
+ /**
+ * Run the split region transaction. If we fail this method, call the
+ * {@link #rollbackSplitRegionTransaction()}. Any changes done by this method
+ * must have a rollback equivalent registered in
+ * {@link #rollbackSplitRegionTransaction()}.
+ * Caller must hold the {@link #splitLock}.
+ * @param hris
+ * @param splitRow Row to split around.
+ * @return two brand-new HRegions or null if a split is not needed
+ * @throws IOException
+ */
+ HRegion [] executeSplitRegionTransaction(final Pair hris,
+ final byte [] splitRow)
+ throws IOException {
+ // 1. Create split dir
+ LOG.info("Starting split of region " + this);
+ Path splitdir = new Path(this.regiondir, SPLITDIR);
+ if(!this.fs.exists(splitdir)) {
+ this.fs.mkdirs(splitdir);
+ }
+ Path dirA = getSplitDirForDaughter(splitdir, hris.getFirst());
+ Path dirB = getSplitDirForDaughter(splitdir, hris.getSecond());
+
+ // 2. Now close parent region. Close returns all store files or null if
+ // not supposed to close.
+ List hstoreFilesToSplit = close(false);
+ if (hstoreFilesToSplit == null) {
+ // If close doesn't succeed -- for now consider it fatal. Throw RE.
+ throw new RuntimeException("Close returned empty list of StoreFiles");
+ }
// Split each store file.
- for(StoreFile h: hstoreFilesToSplit) {
- StoreFile.split(fs,
- Store.getStoreHomedir(splits, regionAInfo.getEncodedName(),
- h.getFamily()),
- h, splitRow, Range.bottom);
- StoreFile.split(fs,
- Store.getStoreHomedir(splits, regionBInfo.getEncodedName(),
- h.getFamily()),
- h, splitRow, Range.top);
+ for (StoreFile sf: hstoreFilesToSplit) {
+ splitStoreFile(sf, splitdir, hris, splitRow);
}
+ // 3. Create region dir for each daughter.
// Create a region instance and then move the splits into place under
// regionA and regionB.
- HRegion regionA =
- HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
+ HRegion regionA = HRegion.newHRegion(tableDir, log, fs, conf,
+ hris.getFirst(), null);
moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
- HRegion regionB =
- HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
+ HRegion regionB = HRegion.newHRegion(tableDir, log, fs, conf,
+ hris.getSecond(), null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
return new HRegion [] {regionA, regionB};
- }
- }
+ }
- /*
- * Get the daughter directories in the splits dir. The splits dir is under
- * the parent regions' directory.
- * @param splits
- * @param hri
- * @return Path to split dir.
+ private void splitStoreFile(final StoreFile sf, final Path splitdir,
+ final Pair hris, final byte [] splitRow)
+ throws IOException {
+ byte [] family = sf.getFamily();
+ String encoded = hris.getFirst().getEncodedName();
+ Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, splitRow, Range.bottom);
+ encoded = hris.getSecond().getEncodedName();
+ storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, splitRow, Range.top);
+ }
+
+ /**
+ * Undo failed {@link #executeSplitRegionTransaction()}.
+ * Caller must hold the {@link #splitLock}. If we fail, must shutdown the
+ * hosting regionserver. Otherwise, could be left with regions offlined or
+ * not correctly registered in .META.
+ * @param hris
* @throws IOException
*/
- private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri)
+ private void rollbackSplitRegionTransaction(final Pair hris)
throws IOException {
- Path d =
- new Path(splits, hri.getEncodedName());
- if (fs.exists(d)) {
- // This should never happen; the splits dir will be newly made when we
- // come in here. Even if we crashed midway through a split, the reopen
- // of the parent region clears out the dir in its initialize method.
- throw new IOException("Cannot split; target file collision at " + d);
- }
- return d;
+ // Rollback 3., creation of daughter regions. Delete anything in filesystem.
+ DELETE WHAT IS IN FS FOR EACH DAUGHTER
+
+ // Rollback 2., close of the parent region -- reopen.
+ REOPEN PARENT.
+
}
protected void prepareToSplit() {
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6a54736..4b31aa4 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1464,12 +1464,7 @@ public class HRegionServer implements HRegionInterface,
}
return;
}
- this.lock.writeLock().lock();
- try {
- this.onlineRegions.put(mapKey, region);
- } finally {
- this.lock.writeLock().unlock();
- }
+ addToOnlineRegions(region);
}
try {
HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
@@ -2147,6 +2142,19 @@ public class HRegionServer implements HRegionInterface,
}
/**
+ * Add to online regions.
+ * @param r
+ */
+ void addToOnlineRegions(final HRegion r) {
+ this.lock.writeLock().lock();
+ try {
+ this.onlineRegions.put(Bytes.mapKey(r.getRegionInfo().getRegionName()), r);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
* This method removes HRegion corresponding to hri from the Map of onlineRegions.
*
* @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
new file mode 100644
index 0000000..a06338e
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
@@ -0,0 +1,22 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * Add and remove online regions.
+ */
+public interface OnlineRegions {
+ /**
+ * Add to online regions.
+ * @param r
+ */
+ void addToOnlineRegions(final HRegion r);
+
+ /**
+ * This method removes HRegion corresponding to hri from the Map of onlineRegions.
+ *
+ * @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
+ * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
+ */
+ HRegion removeFromOnlineRegions(HRegionInfo hri);
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
new file mode 100644
index 0000000..382c056
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -0,0 +1,298 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Executes region split as a "transaction". Call {@link #prepare()} to setup
+ * the transaction, {@link #execute()} to run the transaction and
+ * {@link #rollback()} to cleanup if execute fails.
+ */
+class SplitTransaction {
+ private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
+ private static final String SPLITDIR = "splits";
+
+ /*
+ * Region to split
+ */
+ private final HRegion parent;
+
+ private HRegionInfo hri_a;
+ private HRegionInfo hri_b;
+
+ private Path splitdir;
+
+ /*
+ * Row to split around
+ */
+ private final byte [] splitrow;
+
+ /**
+ * Types to add the journal
+ */
+ enum JournalEntry {
+ CREATE_SPLIT_DIR,
+ CLOSED_PARENT_REGION,
+ STARTED_REGION_A_CREATION,
+ STARTED_REGION_B_CREATION
+ }
+
+ /*
+ * Journal of how far the split transaction has progressed.
+ */
+ private final List journal = new ArrayList();
+
+ /**
+ * Constructor
+ * @param r
+ * @param splitrow
+ */
+ SplitTransaction(final HRegion r, final byte [] splitrow) {
+ this.parent = r;
+ this.splitrow = splitrow;
+ this.splitdir = getSplitDir();
+ }
+
+ /**
+ * Does checks on split inputs.
+ * @return true if the region is splitable else false if it is not -- e.g. its
+ * closed, etc. If we return 'true', we'll have taken out the regions
+ * splitsAndClosesLock and only way to unlock is successful transaction or
+ * rollback.
+ */
+ public boolean prepare() {
+ boolean prepared = false;
+ this.parent.splitsAndClosesLock.writeLock().lock();
+ try {
+ if (this.parent.isClosed() || this.parent.isClosing()) return prepared;
+ HRegionInfo hri = this.parent.getRegionInfo();
+ // Check splitrow.
+ byte [] startKey = hri.getStartKey();
+ byte [] endKey = hri.getEndKey();
+ KVComparator comparator = this.parent.comparator;
+ if (comparator.matchingRows(startKey, 0, startKey.length,
+ this.splitrow, 0, this.splitrow.length)) {
+ LOG.debug("Startkey and splitkey are same, not splitting: " +
+ Bytes.toString(this.splitrow));
+ return prepared;
+ }
+ if (comparator.matchingRows(this.splitrow, 0, this.splitrow.length,
+ endKey, 0, endKey.length)) {
+ LOG.debug("Endkey and splitkey are same, not splitting: " +
+ Bytes.toString(this.splitrow));
+ return prepared;
+ }
+ long rid = getDaughterRegionIdTimestamp(hri);
+ this.hri_a = new HRegionInfo(hri.getTableDesc(), startKey, this.splitrow,
+ false, rid);
+ this.hri_b = new HRegionInfo(hri.getTableDesc(), this.splitrow, endKey,
+ false, rid);
+ prepared = true;
+ } finally {
+ if (!prepared) this.parent.splitsAndClosesLock.writeLock().unlock();
+ }
+ return prepared;
+ }
+
+ /*
+ * Calculate daughter regionid to use.
+ * @param hri Parent hri
+ * @return Daughter region id timestamp.
+ */
+ private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+ long rid = EnvironmentEdgeManager.currentTimeMillis();
+ // Regionid is timestamp. Can't be less than that of parent else will insert
+ // at wrong location in .META. (See HBASE-710).
+ if (rid < hri.getRegionId()) {
+ LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+ " but current time here is " + rid);
+ rid = hri.getRegionId() + 1;
+ }
+ return rid;
+ }
+
+ /**
+ * Call {@link #rollback()} if we fail in here.
+ * @throws IOException If thrown, transaction failed -- call {@link #rollback()}
+ * @see #rollback()
+ */
+ public void execute(final OnlineRegions or) throws IOException {
+ if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) {
+ throw new SplitAndCloseWriteLockNotHeld();
+ }
+ LOG.info("Starting split of region " + this.parent);
+
+ createSplitDir(this.parent.getFilesystem(), this.splitdir);
+ this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
+
+ List hstoreFilesToSplit = this.parent.close(false);
+ this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
+
+ splitStoreFiles(this.splitdir, hstoreFilesToSplit);
+ // Nothing to unroll here -- clean up of CREATE_SPLIT_DIR will clean this up.
+
+ // Log to the journal that we are creating region A. We could fail halfway
+ // through. If we do, we'll have left stuff in fs that needs cleanup.
+ this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
+ createDaughterRegion(this.hri_a);
+
+ this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
+ createDaughterRegion(this.hri_b);
+ }
+
+ private Path getSplitDir() {
+ return new Path(this.parent.getRegionDir(), SPLITDIR);
+ }
+
+ /**
+ * @param fs
+ * @param splitdir
+ * @throws IOException
+ * @see #cleanupSplitDir()
+ */
+ private static void createSplitDir(final FileSystem fs, final Path splitdir)
+ throws IOException {
+ if(!fs.exists(splitdir)) {
+ fs.mkdirs(splitdir);
+ }
+ }
+
+ private static void cleanupSplitDir(final FileSystem fs, final Path splitdir)
+ throws IOException {
+ if (fs.exists(splitdir)) {
+ if (!fs.delete(splitdir, true)) {
+ throw new IOException("Failed delete of " + splitdir);
+ }
+ }
+ }
+
+ private void splitStoreFiles(final Path splitdir,
+ final List hstoreFilesToSplit)
+ throws IOException {
+ if (hstoreFilesToSplit == null) {
+ // Could be null because close didn't succeed -- for now consider it fatal
+ throw new RuntimeException("Close returned empty list of StoreFiles");
+ }
+
+ // Split each store file.
+ for (StoreFile sf: hstoreFilesToSplit) {
+ splitStoreFile(sf, splitdir);
+ }
+ }
+
+ private void splitStoreFile(final StoreFile sf, final Path splitdir)
+ throws IOException {
+ FileSystem fs = this.parent.getFilesystem();
+ byte [] family = sf.getFamily();
+ String encoded = this.hri_a.getEncodedName();
+ Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
+ encoded = this.hri_b.getEncodedName();
+ storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
+ }
+
+ private HRegion createDaughterRegion(final HRegionInfo hri)
+ throws IOException {
+ FileSystem fs = this.parent.getFilesystem();
+ Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
+ this.splitdir, hri);
+ HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
+ this.parent.getLog(), fs, this.parent.getConf(),
+ hri, null);
+ HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
+ return r;
+}
+ /*
+ * Get the daughter directories in the splits dir. The splits dir is under
+ * the parent regions' directory.
+ * @param fs
+ * @param splits
+ * @param hri
+ * @return Path to daughter split dir.
+ * @throws IOException
+ */
+ private static Path getSplitDirForDaughter(final FileSystem fs,
+ final Path splits, final HRegionInfo hri)
+ throws IOException {
+ Path d = new Path(splits, hri.getEncodedName());
+ if (fs.exists(d)) {
+ // This should never happen; the splits dir will be newly made when we
+ // come in here. Even if we crashed midway through a split, the reopen
+ // of the parent region clears out the dir in its initialize method.
+ throw new IOException("Cannot split; target file collision at " + d);
+ }
+ return d;
+ }
+
+ /**
+ * @throws IOException If thrown, rollback failed. Take drastic action.
+ */
+ public void rollback() throws IOException {
+ if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) {
+ throw new SplitAndCloseWriteLockNotHeld();
+ }
+ ListIterator iterator = this.journal.listIterator();
+ while (iterator.hasPrevious()) {
+ JournalEntry je = iterator.previous();
+ switch(je) {
+ case CREATE_SPLIT_DIR:
+ cleanupSplitDir(this.parent.getFilesystem(), this.splitdir);
+ break;
+
+ case CLOSED_PARENT_REGION:
+ // So, this returns a seqid but if we just closed and then reopen
+ this.parent.initialize();
+ break;
+
+ case STARTED_REGION_A_CREATION:
+ // TODO;
+ break;
+
+ case STARTED_REGION_B_CREATION:
+ // TODO;
+ break;
+
+ default:
+ throw new RuntimeException("Unhandled journal entry: " + je);
+ }
+ }
+ }
+
+ /**
+ * Thrown if lock not held.
+ */
+ @SuppressWarnings("serial")
+ public class SplitAndCloseWriteLockNotHeld extends IOException {}
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java b/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java
index 43fa6dd..77c4506 100644
--- a/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java
+++ b/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java
@@ -40,6 +40,13 @@ public class TestImmutableBytesWritable extends TestCase {
new ImmutableBytesWritable(Bytes.toBytes("xxabc"), 2, 2).hashCode());
}
+ public void testSpecificCompare() {
+ ImmutableBytesWritable ibw1 = new ImmutableBytesWritable(new byte[]{0x0f});
+ ImmutableBytesWritable ibw2 = new ImmutableBytesWritable(new byte[]{0x00, 0x00});
+ ImmutableBytesWritable.Comparator c = new ImmutableBytesWritable.Comparator();
+ assertFalse("ibw1 < ibw2", c.compare( ibw1, ibw2 ) < 0 );
+ }
+
public void testComparison() throws Exception {
runTests("aa", "b", -1);
runTests("aa", "aa", 0);