diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 7589db3..260f557 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -144,44 +144,69 @@ class CompactSplitThread extends Thread { } } - private void split(final HRegion region, final byte [] midKey) + private void split(final HRegion parent, final byte [] midKey) throws IOException { - final HRegionInfo oldRegionInfo = region.getRegionInfo(); + final HRegionInfo parentRegionInfo = parent.getRegionInfo(); final long startTime = System.currentTimeMillis(); - final HRegion[] newRegions = region.splitRegion(midKey); + + + + + + final HRegion [] newRegions = parent.splitRegion(midKey); if (newRegions == null) { // Didn't need to be split return; } - // When a region is split, the META table needs to updated if we're - // splitting a 'normal' region, and the ROOT table needs to be - // updated if we are splitting a META region. + // Inform the HRegionServer that the parent HRegion is no-longer online. + this.server.removeFromOnlineRegions(parentRegionInfo); + HTable t = null; - if (region.getRegionInfo().isMetaTable()) { - // We need to update the root region - if (this.root == null) { - this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME); - } - t = root; - } else { - // For normal regions we need to update the meta region - if (meta == null) { - meta = new HTable(conf, HConstants.META_TABLE_NAME); + try { + t = getTable(parent); + Put put = createOfflineParentPut(parentRegionInfo, newRegions); + t.put(put); + // TO RECOVER HERE... NEED TO ADD BACK TO ONINE REGIONS AND RESET!!!! + + // If we crash here, then the daughters will not be added and we'll have + // and offlined parent but no daughters to take up the slack. hbase-2244 + // adds fixup to the metascanners. + + // Add new regions to META + for (int i = 0; i < newRegions.length; i++) { + put = new Put(newRegions[i].getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(newRegions[i].getRegionInfo())); + t.put(put); } - t = meta; + } catch (IOException e) { + + } finally{ + t.close(); } + // Now tell the master about the new regions. If we fail here, its OK. + // Basescanner will do fix up. And reporting split to master is going away. + this.server.reportSplit(parentRegionInfo, newRegions[0].getRegionInfo(), + newRegions[1].getRegionInfo()); + LOG.info("Region split, META updated, and report to master. Parent=" + + parentRegionInfo.toString() + ", new regions: " + newRegions[0].toString() + + ", " + newRegions[1].toString() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + } + + private Put createOfflineParentPut(final HRegionInfo parentRegionInfo, + final HRegion [] newRegions) { // Mark old region as offline and split in META. // NOTE: there is no need for retry logic here. HTable does it for us. - oldRegionInfo.setOffline(true); - oldRegionInfo.setSplit(true); - // Inform the HRegionServer that the parent HRegion is no-longer online. - this.server.removeFromOnlineRegions(oldRegionInfo); - - Put put = new Put(oldRegionInfo.getRegionName()); + HRegionInfo editedParentRegionInfo = + new HRegionInfo(parentRegionInfo); + editedParentRegionInfo.setOffline(true); + editedParentRegionInfo.setSplit(true); + Put put = new Put(editedParentRegionInfo.getRegionName()); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, - Writables.getBytes(oldRegionInfo)); + Writables.getBytes(editedParentRegionInfo)); put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, @@ -190,33 +215,34 @@ class CompactSplitThread extends Thread { Writables.getBytes(newRegions[0].getRegionInfo())); put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER, Writables.getBytes(newRegions[1].getRegionInfo())); - t.put(put); - - // If we crash here, then the daughters will not be added and we'll have - // and offlined parent but no daughters to take up the slack. hbase-2244 - // adds fixup to the metascanners. + return put; + } - // Add new regions to META - for (int i = 0; i < newRegions.length; i++) { - put = new Put(newRegions[i].getRegionName()); - put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, - Writables.getBytes(newRegions[i].getRegionInfo())); - t.put(put); + /* + * @param r Parent region we want to edit. + * @return An HTable instance against the meta table that holds passed + * r + * @throws IOException + */ + private HTable getTable(final HRegion r) throws IOException { + // When a region is split, the META table needs to updated if we're + // splitting a 'normal' region, and the ROOT table needs to be + // updated if we are splitting a META region. + HTable t = null; + if (r.getRegionInfo().isMetaTable()) { + // We need to update the root region + if (this.root == null) { + this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME); + } + t = root; + } else { + // For normal regions we need to update the meta region + if (meta == null) { + meta = new HTable(conf, HConstants.META_TABLE_NAME); + } + t = meta; } - - // If we crash here, the master will not know of the new daughters and they - // will not be assigned. The metascanner when it runs will notice and take - // care of assigning the new daughters. - - // Now tell the master about the new regions - server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(), - newRegions[1].getRegionInfo()); - - LOG.info("region split, META updated, and report to master all" + - " successful. Old region=" + oldRegionInfo.toString() + - ", new regions: " + newRegions[0].toString() + ", " + - newRegions[1].toString() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + return t; } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6dc41a4..618d0ec 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -123,7 +123,6 @@ import com.google.common.collect.Lists; */ public class HRegion implements HeapSize { // , Writable{ public static final Log LOG = LogFactory.getLog(HRegion.class); - static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; final AtomicBoolean closed = new AtomicBoolean(false); @@ -218,7 +217,7 @@ public class HRegion implements HeapSize { // , Writable{ private final long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard splits and closes - private final ReentrantReadWriteLock splitsAndClosesLock = + final ReentrantReadWriteLock splitsAndClosesLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); @@ -226,7 +225,6 @@ public class HRegion implements HeapSize { // , Writable{ // Stop updates lock private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock(); - private final Object splitLock = new Object(); private boolean splitRequest; private final ReadWriteConsistencyControl rwcc = @@ -369,7 +367,7 @@ public class HRegion implements HeapSize { // , Writable{ * @param initialFiles * @throws IOException */ - private static void moveInitialFilesIntoPlace(final FileSystem fs, + static void moveInitialFilesIntoPlace(final FileSystem fs, final Path initialFiles, final Path regiondir) throws IOException { if (initialFiles != null && fs.exists(initialFiles)) { @@ -468,70 +466,68 @@ public class HRegion implements HeapSize { // , Writable{ * * @throws IOException e */ - public List close(final boolean abort) throws IOException { + public synchronized List close(final boolean abort) + throws IOException { if (isClosed()) { - LOG.warn("region " + this + " already closed"); + LOG.warn("Region " + this + " already closed"); return null; } - synchronized (splitLock) { - boolean wasFlushing = false; - synchronized (writestate) { - // Disable compacting and flushing by background threads for this - // region. - writestate.writesEnabled = false; - wasFlushing = writestate.flushing; - LOG.debug("Closing " + this + ": disabling compactions & flushes"); - while (writestate.compacting || writestate.flushing) { - LOG.debug("waiting for" + - (writestate.compacting ? " compaction" : "") + - (writestate.flushing ? - (writestate.compacting ? "," : "") + " cache flush" : - "") + " to complete for region " + this); - try { - writestate.wait(); - } catch (InterruptedException iex) { - // continue - } + boolean wasFlushing = false; + synchronized (writestate) { + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; + wasFlushing = writestate.flushing; + LOG.debug("Closing " + this + ": disabling compactions & flushes"); + while (writestate.compacting || writestate.flushing) { + LOG.debug("waiting for" + + (writestate.compacting ? " compaction" : "") + + (writestate.flushing ? + (writestate.compacting ? "," : "") + " cache flush" : + "") + " to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // continue } } - // If we were not just flushing, is it worth doing a preflush...one - // that will clear out of the bulk of the memstore before we put up - // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { - LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(); - } - newScannerLock.writeLock().lock(); - this.closing.set(true); + } + // If we were not just flushing, is it worth doing a preflush...one + // that will clear out of the bulk of the memstore before we put up + // the close flag? + if (!abort && !wasFlushing && worthPreFlushing()) { + LOG.info("Running close preflush of " + this.getRegionNameAsString()); + internalFlushcache(); + } + newScannerLock.writeLock().lock(); + this.closing.set(true); + try { + splitsAndClosesLock.writeLock().lock(); + LOG.debug("Updates disabled for region, no outstanding scanners on " + this); try { - splitsAndClosesLock.writeLock().lock(); - LOG.debug("Updates disabled for region, no outstanding scanners on " + - this); - try { - // Write lock means no more row locks can be given out. Wait on - // outstanding row locks to come in before we close so we do not drop - // outstanding updates. - waitOnRowLocks(); - LOG.debug("No more row locks outstanding on region " + this); - - // Don't flush the cache if we are aborting - if (!abort) { - internalFlushcache(); - } + // Write lock means no more row locks can be given out. Wait on + // outstanding row locks to come in before we close so we do not drop + // outstanding updates. + waitOnRowLocks(); + LOG.debug("No more row locks outstanding on region " + this); + + // Don't flush the cache if we are aborting + if (!abort) { + internalFlushcache(); + } - List result = new ArrayList(); - for (Store store: stores.values()) { - result.addAll(store.close()); - } - this.closed.set(true); - LOG.info("Closed " + this); - return result; - } finally { - splitsAndClosesLock.writeLock().unlock(); + List result = new ArrayList(); + for (Store store: stores.values()) { + result.addAll(store.close()); } + this.closed.set(true); + LOG.info("Closed " + this); + return result; } finally { - newScannerLock.writeLock().unlock(); + splitsAndClosesLock.writeLock().unlock(); } + } finally { + newScannerLock.writeLock().unlock(); } } @@ -621,107 +617,137 @@ public class HRegion implements HeapSize { // , Writable{ return size; } - /* - * Split the HRegion to create two brand-new ones. This also closes - * current HRegion. Split should be fast since we don't rewrite store files - * but instead create new 'reference' store files that read off the top and - * bottom ranges of parent store files. - * @param splitRow row on which to split region - * @return two brand-new HRegions or null if a split is not needed + /** + * Prepare to run the split region "transaction". Runs checks that we can + * actually split. + * Caller must hold the {@link #splitLock}. If this method fails, no damage + * done, can continue. + * @param splitRow Row to split on. + * @return Prospective daughter HRegionInfos or null if setup failed (this + * region is closed, etc). * @throws IOException + * @see #executeSplitRegionTransaction + * @see #rollbackSplitRegionTransaction */ - HRegion [] splitRegion(final byte [] splitRow) throws IOException { + Pair prepareSplitRegionTransaction(final byte [] splitRow) + throws IOException { prepareToSplit(); - synchronized (splitLock) { - if (closed.get()) { - return null; - } - // Add start/end key checking: hbase-428. - byte [] startKey = this.regionInfo.getStartKey(); - byte [] endKey = this.regionInfo.getEndKey(); - if (this.comparator.matchingRows(startKey, 0, startKey.length, - splitRow, 0, splitRow.length)) { - LOG.debug("Startkey and midkey are same, not splitting"); - return null; - } - if (this.comparator.matchingRows(splitRow, 0, splitRow.length, - endKey, 0, endKey.length)) { - LOG.debug("Endkey and midkey are same, not splitting"); - return null; - } - LOG.info("Starting split of region " + this); - Path splits = new Path(this.regiondir, SPLITDIR); - if(!this.fs.exists(splits)) { - this.fs.mkdirs(splits); - } - // Calculate regionid to use. Can't be less than that of parent else - // it'll insert into wrong location over in .META. table: HBASE-710. - long rid = EnvironmentEdgeManager.currentTimeMillis(); - if (rid < this.regionInfo.getRegionId()) { - LOG.warn("Clock skew; parent regions id is " + - this.regionInfo.getRegionId() + " but current time here is " + rid); - rid = this.regionInfo.getRegionId() + 1; - } - HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - startKey, splitRow, false, rid); - Path dirA = getSplitDirForDaughter(splits, regionAInfo); - HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - splitRow, endKey, false, rid); - Path dirB = getSplitDirForDaughter(splits, regionBInfo); - - // Now close the HRegion. Close returns all store files or null if not - // supposed to close (? What to do in this case? Implement abort of close?) - // Close also does wait on outstanding rows and calls a flush just-in-case. - List hstoreFilesToSplit = close(false); - if (hstoreFilesToSplit == null) { - LOG.warn("Close came back null (Implement abort of close?)"); - throw new RuntimeException("close returned empty vector of HStoreFiles"); - } + if (closed.get()) return null; + // Add start/end key checking: hbase-428. + byte [] startKey = this.regionInfo.getStartKey(); + byte [] endKey = this.regionInfo.getEndKey(); + if (this.comparator.matchingRows(startKey, 0, startKey.length, + splitRow, 0, splitRow.length)) { + LOG.debug("Startkey and midkey are same, not splitting"); + return null; + } + if (this.comparator.matchingRows(splitRow, 0, splitRow.length, + endKey, 0, endKey.length)) { + LOG.debug("Endkey and midkey are same, not splitting"); + return null; + } + long rid = getDaughterRegionIdTimestamp(this.regionInfo.getRegionId()); + HRegionInfo a = new HRegionInfo(this.regionInfo.getTableDesc(), + startKey, splitRow, false, rid); + HRegionInfo b = new HRegionInfo(this.regionInfo.getTableDesc(), + splitRow, endKey, false, rid); + return new Pair(a, b); + } + + /* + * Calculate daughter regionid to use. Regionid is timestamp. Can't be less + * than that of parent else will insert at wrong location in .META. + * (See HBASE-710). + * @param parentid + * @return Daughter region id timestamp. + */ + private long getDaughterRegionIdTimestamp(final long parentid) { + long rid = EnvironmentEdgeManager.currentTimeMillis(); + if (rid < this.regionInfo.getRegionId()) { + LOG.warn("Clock skew; parent regions id is " + + this.regionInfo.getRegionId() + " but current time here is " + rid); + rid = this.regionInfo.getRegionId() + 1; + } + return rid; + } + + /** + * Run the split region transaction. If we fail this method, call the + * {@link #rollbackSplitRegionTransaction()}. Any changes done by this method + * must have a rollback equivalent registered in + * {@link #rollbackSplitRegionTransaction()}. + * Caller must hold the {@link #splitLock}. + * @param hris + * @param splitRow Row to split around. + * @return two brand-new HRegions or null if a split is not needed + * @throws IOException + */ + HRegion [] executeSplitRegionTransaction(final Pair hris, + final byte [] splitRow) + throws IOException { + // 1. Create split dir + LOG.info("Starting split of region " + this); + Path splitdir = new Path(this.regiondir, SPLITDIR); + if(!this.fs.exists(splitdir)) { + this.fs.mkdirs(splitdir); + } + Path dirA = getSplitDirForDaughter(splitdir, hris.getFirst()); + Path dirB = getSplitDirForDaughter(splitdir, hris.getSecond()); + + // 2. Now close parent region. Close returns all store files or null if + // not supposed to close. + List hstoreFilesToSplit = close(false); + if (hstoreFilesToSplit == null) { + // If close doesn't succeed -- for now consider it fatal. Throw RE. + throw new RuntimeException("Close returned empty list of StoreFiles"); + } // Split each store file. - for(StoreFile h: hstoreFilesToSplit) { - StoreFile.split(fs, - Store.getStoreHomedir(splits, regionAInfo.getEncodedName(), - h.getFamily()), - h, splitRow, Range.bottom); - StoreFile.split(fs, - Store.getStoreHomedir(splits, regionBInfo.getEncodedName(), - h.getFamily()), - h, splitRow, Range.top); + for (StoreFile sf: hstoreFilesToSplit) { + splitStoreFile(sf, splitdir, hris, splitRow); } + // 3. Create region dir for each daughter. // Create a region instance and then move the splits into place under // regionA and regionB. - HRegion regionA = - HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null); + HRegion regionA = HRegion.newHRegion(tableDir, log, fs, conf, + hris.getFirst(), null); moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir()); - HRegion regionB = - HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null); + HRegion regionB = HRegion.newHRegion(tableDir, log, fs, conf, + hris.getSecond(), null); moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir()); return new HRegion [] {regionA, regionB}; - } - } + } - /* - * Get the daughter directories in the splits dir. The splits dir is under - * the parent regions' directory. - * @param splits - * @param hri - * @return Path to split dir. + private void splitStoreFile(final StoreFile sf, final Path splitdir, + final Pair hris, final byte [] splitRow) + throws IOException { + byte [] family = sf.getFamily(); + String encoded = hris.getFirst().getEncodedName(); + Path storedir = Store.getStoreHomedir(splitdir, encoded, family); + StoreFile.split(fs, storedir, sf, splitRow, Range.bottom); + encoded = hris.getSecond().getEncodedName(); + storedir = Store.getStoreHomedir(splitdir, encoded, family); + StoreFile.split(fs, storedir, sf, splitRow, Range.top); + } + + /** + * Undo failed {@link #executeSplitRegionTransaction()}. + * Caller must hold the {@link #splitLock}. If we fail, must shutdown the + * hosting regionserver. Otherwise, could be left with regions offlined or + * not correctly registered in .META. + * @param hris * @throws IOException */ - private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri) + private void rollbackSplitRegionTransaction(final Pair hris) throws IOException { - Path d = - new Path(splits, hri.getEncodedName()); - if (fs.exists(d)) { - // This should never happen; the splits dir will be newly made when we - // come in here. Even if we crashed midway through a split, the reopen - // of the parent region clears out the dir in its initialize method. - throw new IOException("Cannot split; target file collision at " + d); - } - return d; + // Rollback 3., creation of daughter regions. Delete anything in filesystem. + DELETE WHAT IS IN FS FOR EACH DAUGHTER + + // Rollback 2., close of the parent region -- reopen. + REOPEN PARENT. + } protected void prepareToSplit() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6a54736..4b31aa4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1464,12 +1464,7 @@ public class HRegionServer implements HRegionInterface, } return; } - this.lock.writeLock().lock(); - try { - this.onlineRegions.put(mapKey, region); - } finally { - this.lock.writeLock().unlock(); - } + addToOnlineRegions(region); } try { HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo); @@ -2147,6 +2142,19 @@ public class HRegionServer implements HRegionInterface, } /** + * Add to online regions. + * @param r + */ + void addToOnlineRegions(final HRegion r) { + this.lock.writeLock().lock(); + try { + this.onlineRegions.put(Bytes.mapKey(r.getRegionInfo().getRegionName()), r); + } finally { + this.lock.writeLock().unlock(); + } + } + + /** * This method removes HRegion corresponding to hri from the Map of onlineRegions. * * @param hri the HRegionInfo corresponding to the HRegion to-be-removed. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java new file mode 100644 index 0000000..a06338e --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * Add and remove online regions. + */ +public interface OnlineRegions { + /** + * Add to online regions. + * @param r + */ + void addToOnlineRegions(final HRegion r); + + /** + * This method removes HRegion corresponding to hri from the Map of onlineRegions. + * + * @param hri the HRegionInfo corresponding to the HRegion to-be-removed. + * @return the removed HRegion, or null if the HRegion was not in onlineRegions. + */ + HRegion removeFromOnlineRegions(HRegionInfo hri); +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java new file mode 100644 index 0000000..382c056 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -0,0 +1,298 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.io.Reference.Range; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Executes region split as a "transaction". Call {@link #prepare()} to setup + * the transaction, {@link #execute()} to run the transaction and + * {@link #rollback()} to cleanup if execute fails. + */ +class SplitTransaction { + private static final Log LOG = LogFactory.getLog(SplitTransaction.class); + private static final String SPLITDIR = "splits"; + + /* + * Region to split + */ + private final HRegion parent; + + private HRegionInfo hri_a; + private HRegionInfo hri_b; + + private Path splitdir; + + /* + * Row to split around + */ + private final byte [] splitrow; + + /** + * Types to add the journal + */ + enum JournalEntry { + CREATE_SPLIT_DIR, + CLOSED_PARENT_REGION, + STARTED_REGION_A_CREATION, + STARTED_REGION_B_CREATION + } + + /* + * Journal of how far the split transaction has progressed. + */ + private final List journal = new ArrayList(); + + /** + * Constructor + * @param r + * @param splitrow + */ + SplitTransaction(final HRegion r, final byte [] splitrow) { + this.parent = r; + this.splitrow = splitrow; + this.splitdir = getSplitDir(); + } + + /** + * Does checks on split inputs. + * @return true if the region is splitable else false if it is not -- e.g. its + * closed, etc. If we return 'true', we'll have taken out the regions + * splitsAndClosesLock and only way to unlock is successful transaction or + * rollback. + */ + public boolean prepare() { + boolean prepared = false; + this.parent.splitsAndClosesLock.writeLock().lock(); + try { + if (this.parent.isClosed() || this.parent.isClosing()) return prepared; + HRegionInfo hri = this.parent.getRegionInfo(); + // Check splitrow. + byte [] startKey = hri.getStartKey(); + byte [] endKey = hri.getEndKey(); + KVComparator comparator = this.parent.comparator; + if (comparator.matchingRows(startKey, 0, startKey.length, + this.splitrow, 0, this.splitrow.length)) { + LOG.debug("Startkey and splitkey are same, not splitting: " + + Bytes.toString(this.splitrow)); + return prepared; + } + if (comparator.matchingRows(this.splitrow, 0, this.splitrow.length, + endKey, 0, endKey.length)) { + LOG.debug("Endkey and splitkey are same, not splitting: " + + Bytes.toString(this.splitrow)); + return prepared; + } + long rid = getDaughterRegionIdTimestamp(hri); + this.hri_a = new HRegionInfo(hri.getTableDesc(), startKey, this.splitrow, + false, rid); + this.hri_b = new HRegionInfo(hri.getTableDesc(), this.splitrow, endKey, + false, rid); + prepared = true; + } finally { + if (!prepared) this.parent.splitsAndClosesLock.writeLock().unlock(); + } + return prepared; + } + + /* + * Calculate daughter regionid to use. + * @param hri Parent hri + * @return Daughter region id timestamp. + */ + private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { + long rid = EnvironmentEdgeManager.currentTimeMillis(); + // Regionid is timestamp. Can't be less than that of parent else will insert + // at wrong location in .META. (See HBASE-710). + if (rid < hri.getRegionId()) { + LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + + " but current time here is " + rid); + rid = hri.getRegionId() + 1; + } + return rid; + } + + /** + * Call {@link #rollback()} if we fail in here. + * @throws IOException If thrown, transaction failed -- call {@link #rollback()} + * @see #rollback() + */ + public void execute(final OnlineRegions or) throws IOException { + if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) { + throw new SplitAndCloseWriteLockNotHeld(); + } + LOG.info("Starting split of region " + this.parent); + + createSplitDir(this.parent.getFilesystem(), this.splitdir); + this.journal.add(JournalEntry.CREATE_SPLIT_DIR); + + List hstoreFilesToSplit = this.parent.close(false); + this.journal.add(JournalEntry.CLOSED_PARENT_REGION); + + splitStoreFiles(this.splitdir, hstoreFilesToSplit); + // Nothing to unroll here -- clean up of CREATE_SPLIT_DIR will clean this up. + + // Log to the journal that we are creating region A. We could fail halfway + // through. If we do, we'll have left stuff in fs that needs cleanup. + this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); + createDaughterRegion(this.hri_a); + + this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); + createDaughterRegion(this.hri_b); + } + + private Path getSplitDir() { + return new Path(this.parent.getRegionDir(), SPLITDIR); + } + + /** + * @param fs + * @param splitdir + * @throws IOException + * @see #cleanupSplitDir() + */ + private static void createSplitDir(final FileSystem fs, final Path splitdir) + throws IOException { + if(!fs.exists(splitdir)) { + fs.mkdirs(splitdir); + } + } + + private static void cleanupSplitDir(final FileSystem fs, final Path splitdir) + throws IOException { + if (fs.exists(splitdir)) { + if (!fs.delete(splitdir, true)) { + throw new IOException("Failed delete of " + splitdir); + } + } + } + + private void splitStoreFiles(final Path splitdir, + final List hstoreFilesToSplit) + throws IOException { + if (hstoreFilesToSplit == null) { + // Could be null because close didn't succeed -- for now consider it fatal + throw new RuntimeException("Close returned empty list of StoreFiles"); + } + + // Split each store file. + for (StoreFile sf: hstoreFilesToSplit) { + splitStoreFile(sf, splitdir); + } + } + + private void splitStoreFile(final StoreFile sf, final Path splitdir) + throws IOException { + FileSystem fs = this.parent.getFilesystem(); + byte [] family = sf.getFamily(); + String encoded = this.hri_a.getEncodedName(); + Path storedir = Store.getStoreHomedir(splitdir, encoded, family); + StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom); + encoded = this.hri_b.getEncodedName(); + storedir = Store.getStoreHomedir(splitdir, encoded, family); + StoreFile.split(fs, storedir, sf, this.splitrow, Range.top); + } + + private HRegion createDaughterRegion(final HRegionInfo hri) + throws IOException { + FileSystem fs = this.parent.getFilesystem(); + Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(), + this.splitdir, hri); + HRegion r = HRegion.newHRegion(this.parent.getTableDir(), + this.parent.getLog(), fs, this.parent.getConf(), + hri, null); + HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir()); + return r; +} + /* + * Get the daughter directories in the splits dir. The splits dir is under + * the parent regions' directory. + * @param fs + * @param splits + * @param hri + * @return Path to daughter split dir. + * @throws IOException + */ + private static Path getSplitDirForDaughter(final FileSystem fs, + final Path splits, final HRegionInfo hri) + throws IOException { + Path d = new Path(splits, hri.getEncodedName()); + if (fs.exists(d)) { + // This should never happen; the splits dir will be newly made when we + // come in here. Even if we crashed midway through a split, the reopen + // of the parent region clears out the dir in its initialize method. + throw new IOException("Cannot split; target file collision at " + d); + } + return d; + } + + /** + * @throws IOException If thrown, rollback failed. Take drastic action. + */ + public void rollback() throws IOException { + if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) { + throw new SplitAndCloseWriteLockNotHeld(); + } + ListIterator iterator = this.journal.listIterator(); + while (iterator.hasPrevious()) { + JournalEntry je = iterator.previous(); + switch(je) { + case CREATE_SPLIT_DIR: + cleanupSplitDir(this.parent.getFilesystem(), this.splitdir); + break; + + case CLOSED_PARENT_REGION: + // So, this returns a seqid but if we just closed and then reopen + this.parent.initialize(); + break; + + case STARTED_REGION_A_CREATION: + // TODO; + break; + + case STARTED_REGION_B_CREATION: + // TODO; + break; + + default: + throw new RuntimeException("Unhandled journal entry: " + je); + } + } + } + + /** + * Thrown if lock not held. + */ + @SuppressWarnings("serial") + public class SplitAndCloseWriteLockNotHeld extends IOException {} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java b/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java index 43fa6dd..77c4506 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java +++ b/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java @@ -40,6 +40,13 @@ public class TestImmutableBytesWritable extends TestCase { new ImmutableBytesWritable(Bytes.toBytes("xxabc"), 2, 2).hashCode()); } + public void testSpecificCompare() { + ImmutableBytesWritable ibw1 = new ImmutableBytesWritable(new byte[]{0x0f}); + ImmutableBytesWritable ibw2 = new ImmutableBytesWritable(new byte[]{0x00, 0x00}); + ImmutableBytesWritable.Comparator c = new ImmutableBytesWritable.Comparator(); + assertFalse("ibw1 < ibw2", c.compare( ibw1, ibw2 ) < 0 ); + } + public void testComparison() throws Exception { runTests("aa", "b", -1); runTests("aa", "aa", 0);