diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index b48b390..de13e27 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -104,6 +104,8 @@ public abstract class EventHandler implements Runnable, Comparable { RS_ZK_REGION_CLOSED (2), // RS has finished closing a region RS_ZK_REGION_OPENING (3), // RS is in process of opening a region RS_ZK_REGION_OPENED (4), // RS has finished opening a region + RS_ZK_REGION_SPLITTING (5), // RS has started a region split + RS_ZK_REGION_SPLIT (6), // RS split has completed. // Messages originating from Master to RS M_RS_OPEN_REGION (20), // Master asking RS to open a region diff --git a/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java index 5e3cc27..cdf5939 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java @@ -47,6 +47,8 @@ public class RegionTransitionData implements Writable { /** Time the event was created. Required but automatically set. */ private long stamp; + private byte [] payload; + /** * Writable constructor. Do not use directly. */ @@ -90,10 +92,31 @@ public class RegionTransitionData implements Writable { */ public RegionTransitionData(EventType eventType, byte [] regionName, String serverName) { + this(eventType, regionName, serverName, null); + } + + /** + * Construct data for a new region transition event with the specified event + * type, region name, and server name. + * + *

Used when the server name is known (a regionserver is setting it). + * + *

Valid types for this constructor are {@link EventType#RS_ZK_REGION_CLOSING}, + * {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING}, + * and {@link EventType#RS_ZK_REGION_OPENED}. + * + * @param eventType type of event + * @param regionName name of region as per HRegionInfo#getRegionName() + * @param serverName name of server setting data + * @param payload Optional extra payload. Can be null + */ + public RegionTransitionData(EventType eventType, byte [] regionName, + String serverName, final byte [] payload) { this.eventType = eventType; this.stamp = System.currentTimeMillis(); this.regionName = regionName; this.serverName = serverName; + this.payload = payload; } /** @@ -106,6 +129,8 @@ public class RegionTransitionData implements Writable { *

  • {@link EventType#RS_ZK_REGION_CLOSED} *
  • {@link EventType#RS_ZK_REGION_OPENING} *
  • {@link EventType#RS_ZK_REGION_OPENED} + *
  • {@link EventType#RS_ZK_REGION_SPLITTING} + *
  • {@link EventType#RS_ZK_REGION_SPLIT} * * @return type of region transition event */ @@ -142,6 +167,13 @@ public class RegionTransitionData implements Writable { return stamp; } + /** + * @return Payload if any. + */ + public byte [] getPayload() { + return this.payload; + } + @Override public void readFields(DataInput in) throws IOException { // the event type byte @@ -157,6 +189,9 @@ public class RegionTransitionData implements Writable { } else { serverName = null; } + if (in.readBoolean()) { + this.payload = Bytes.readByteArray(in); + } } @Override @@ -169,6 +204,10 @@ public class RegionTransitionData implements Writable { if(serverName != null) { out.writeUTF(serverName); } + out.writeBoolean(this.payload != null); + if (this.payload != null) { + Bytes.writeByteArray(out, this.payload); + } } /** @@ -207,4 +246,4 @@ public class RegionTransitionData implements Writable { return "region=" + Bytes.toString(regionName) + ", server=" + serverName + ", state=" + eventType; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f14a0ed..5613177 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -53,9 +53,9 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; @@ -63,12 +63,13 @@ import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.AsyncCallback; @@ -365,6 +366,42 @@ public class AssignmentManager extends ZooKeeperListener { // Nothing to do. break; + case RS_ZK_REGION_SPLITTING: + if (regionState != null) { + // The SPLITTING state is being tickled; update timers. + if (!regionState.isSplitting()) { + LOG.warn("Received SPLITTING for region " + + prettyPrintedRegionName + " from server " + + data.getServerName() + " but region was in state " + + regionState + " and not in expected SPLITTING state"); + return; + } + regionState.update(RegionState.State.SPLITTING); + } else { + // Add this region to RIT so region seen as in transition. + synchronized (this.regions) { + regionState = findHRegionInfoThenAddToRIT(data.getServerName(), + encodedName); + regionState.update(RegionState.State.SPLITTING); + } + } + break; + + case RS_ZK_REGION_SPLIT: + if (regionState == null) { + LOG.warn("Received SPLIT for region " + prettyPrintedRegionName + + " from server " + data.getServerName() + + " but region was not first in SPLITTING state; continuing"); + synchronized (this.regions) { + regionState = findHRegionInfoThenAddToRIT(data.getServerName(), + encodedName); + regionState.update(RegionState.State.SPLITTING); + } + } + byte [] payload = data.getPayload(); + HRegionInfo a = Writables.getWritable(payload, 0, length, w) + break; + case RS_ZK_REGION_CLOSING: // Should see CLOSING after we have asked it to CLOSE or additional // times after already being in state of CLOSING @@ -436,6 +473,43 @@ public class AssignmentManager extends ZooKeeperListener { } /** + * Caller must hold lock on this.regions. + * @param serverName + * @param encodedName + * @return The instance of RegionState that was added to RIT or null if error. + */ + private RegionState findHRegionInfoThenAddToRIT(final String serverName, + final String encodedName) { + HRegionInfo hri = findHRegionInfo(serverName, encodedName); + if (hri == null) { + LOG.warn("Region " + encodedName + " not found on server " + serverName + + "; failed processing"); + return null; + } + // Add to regions in transition, then update state to SPLITTING. + return addToRegionsInTransition(hri); + } + + /** + * Caller must hold lock on this.regions. + * @param serverName + * @param encodedName + * @return Found HRegionInfo or null. + */ + private HRegionInfo findHRegionInfo(final String serverName, + final String encodedName) { + List hris = this.servers.get(serverName); + HRegionInfo foundHri = null; + for (HRegionInfo hri: hris) { + if (hri.getEncodedName().equals(encodedName)) { + foundHri = hri; + break; + } + } + return foundHri; + } + + /** * Handle a ZK unassigned node transition triggered by HBCK repair tool. *

    * This is handled in a separate code path because it breaks the normal rules. @@ -1844,7 +1918,9 @@ public class AssignmentManager extends ZooKeeperListener { OPEN, // server opened region and updated meta PENDING_CLOSE, // sent rpc to server to close but has not begun CLOSING, // server has begun to close but not yet done - CLOSED // server closed region and updated meta + CLOSED, // server closed region and updated meta + SPLITTING, + SPLIT } private State state; @@ -1912,6 +1988,10 @@ public class AssignmentManager extends ZooKeeperListener { return state == State.OFFLINE; } + public boolean isSplitting() { + return state == State.SPLITTING; + } + @Override public String toString() { return region.getRegionNameAsString() + " state=" + state + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2eeb19f..d90a68e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -158,7 +158,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { try { LOG.info("Running rollback of failed split of " + parent.getRegionNameAsString() + "; " + ioe.getMessage()); - st.rollback(this.server); + st.rollback(this.server, this.server); LOG.info("Successful rollback of failed split of " + parent.getRegionNameAsString()); } catch (RuntimeException e) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index eccb4eb..ba2c070 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -40,13 +40,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -96,6 +103,10 @@ public class SplitTransaction { */ enum JournalEntry { /** + * Set region as in transition, set it into SPLITTING state. + */ + SET_SPLITTING_IN_ZK, + /** * We created the temporary split data directory. */ CREATE_SPLIT_DIR, @@ -114,7 +125,7 @@ public class SplitTransaction { /** * Started in on the creation of the second daughter region. */ - STARTED_REGION_B_CREATION + STARTED_REGION_B_CREATION, } /* @@ -185,11 +196,13 @@ public class SplitTransaction { * @param services Used to online/offline regions. * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)} * @return Regions created + * @throws KeeperException + * @throws NodeExistsException * @see #rollback(OnlineRegions) */ public PairOfSameType execute(final Server server, final RegionServerServices services) - throws IOException { + throws IOException, NodeExistsException, KeeperException { LOG.info("Starting split of region " + this.parent); if ((server != null && server.isStopped()) || (services != null && services.isStopping())) { @@ -202,12 +215,19 @@ public class SplitTransaction { this.parent.getCoprocessorHost().preSplit(); } - // If true, no cluster to write meta edits into. + // If true, no cluster to write meta edits to or to update znodes in. boolean testing = server == null? true: server.getConfiguration().getBoolean("hbase.testing.nocluster", false); this.fileSplitTimeout = testing ? this.fileSplitTimeout : - server.getConfiguration().getLong( - "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout); + server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", + this.fileSplitTimeout); + + // Set ephemeral SPLITTING znode up in zk. + if (server != null) { + createNodeSplitting(server.getZooKeeper(), + this.parent.getRegionInfo(), server.getServerName()); + } + this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); createSplitDir(this.parent.getFilesystem(), this.splitdir); this.journal.add(JournalEntry.CREATE_SPLIT_DIR); @@ -254,14 +274,9 @@ public class SplitTransaction { this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo()); } - // The is the point of no return. We are committed to the split now. We + // This is the point of no return. We are committed to the split now. We // have still the daughter regions to open but meta has been changed. - // If we fail from here on out, we can not rollback so, we'll just abort. - // The meta has been changed though so there will need to be a fixup run - // during processing of the crashed server by master (TODO: Verify this in place). - - // TODO: Could we be smarter about the sequence in which we do these steps? - + // If we fail from here on out, we cannot rollback so, we'll just abort. if (!testing) { // Open daughters in parallel. DaughterOpener aOpener = new DaughterOpener(server, services, a); @@ -276,6 +291,16 @@ public class SplitTransaction { } } + // Tell master about split by updating zk. If we fail abort. + if (server != null) { + try { + transitionNodeSplit(server.getZooKeeper(), parent.getRegionInfo(), + a.getRegionInfo(), b.getRegionInfo(), server.getServerName()); + } catch (Exception e) { + server.abort("Failed telling master about split", e); + } + } + // Coprocessor callback if (this.parent.getCoprocessorHost() != null) { this.parent.getCoprocessorHost().postSplit(a,b); @@ -543,13 +568,18 @@ public class SplitTransaction { * @return The region we were splitting * @throws IOException If thrown, rollback failed. Take drastic action. */ - public void rollback(final OnlineRegions or) throws IOException { + public void rollback(final Server server, final OnlineRegions or) + throws IOException { FileSystem fs = this.parent.getFilesystem(); ListIterator iterator = this.journal.listIterator(this.journal.size()); while (iterator.hasPrevious()) { JournalEntry je = iterator.previous(); switch(je) { + case SET_SPLITTING_IN_ZK: + cleanZK(server, this.parent.getRegionInfo()); + break; + case CREATE_SPLIT_DIR: cleanupSplitDir(fs, this.splitdir); break; @@ -623,4 +653,90 @@ public class SplitTransaction { cleanupSplitDir(r.getFilesystem(), splitdir); LOG.info("Cleaned up old failed split transaction detritus: " + splitdir); } -} + + private static void cleanZK(final Server server, final HRegionInfo hri) { + try { + ZKAssign.deleteNodeFailSilent(server.getZooKeeper(), hri); + } catch (KeeperException e) { + server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); + } + } + + /** + * Creates a new ephemeral node in the SPLITTING state for the specified region. + * Create it ephemeral in case regionserver dies mid-split. + * + *

    Does not transition nodes from other states. If a node already exists + * for this region, a {@link NodeExistsException} will be thrown. + * + * @param zkw zk reference + * @param region region to be created as offline + * @param serverName server event originates from + * @return Version of znode created. + * @throws IOException + */ + private static void createNodeSplitting(final ZooKeeperWatcher zkw, + final HRegionInfo region, final String serverName) + throws KeeperException, IOException { + LOG.debug(zkw.prefix("Creating ephemeral node for " + + region.getEncodedName() + " in SPLITTING state")); + RegionTransitionData data = + new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING, + region.getRegionName(), serverName); + // This synchronization is copied from ZKAssign. + synchronized(zkw.getNodes()) { + String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); + zkw.getNodes().add(node); + if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) { + throw new IOException("Failed create of ephemeral " + node); + } + } + } + + /** + * Transitions an existing node for the specified region which is + * currently in the SPLITTING state to be in the SPLIT state. Converts the + * ephemeral SPLITTING znode to a permanent SPLIT node. Master cleans up + * SPLIT znode when it reads it. + * + *

    Does not transition nodes from other states. If for some reason the + * node could not be transitioned, the method returns -1. If the transition + * is successful, the version of the node after transition is returned. + * + *

    This method can fail and return false for three different reasons: + *

    • Node for this region does not exist
    • + *
    • Node for this region is not in SPLITTING state
    • + *
    • After verifying SPLITTING state, update fails because of wrong version + * (this should never actually happen since an RS only does this transition + * following a transition to SPLITTING. if two RS are conflicting, one would + * fail the original transition to SPLITTING and not this transition)
    • + *
    + * + *

    Does not set any watches. + * + *

    This method should only be used by a RegionServer when completing the + * open of a region. + * + * @param zkw zk reference + * @param parent region to be transitioned to opened + * @param a Daughter a of split + * @param b Daughter b of split + * @param serverName server event originates from + * @return version of node after transition, -1 if unsuccessful transition + * @throws KeeperException if unexpected zookeeper exception + * @throws IOException + */ + private static int transitionNodeSplit(ZooKeeperWatcher zkw, + HRegionInfo parent, HRegionInfo a, HRegionInfo b, String serverName) + throws KeeperException, IOException { + // Just serialize the daughter hregioninfos as payload. + byte [] aBytes = Writables.getBytes(a); + byte [] bBytes = Writables.getBytes(b); + byte [] payload = new byte[aBytes.length + bBytes.length]; + System.arraycopy(aBytes, 0, payload, 0, aBytes.length); + System.arraycopy(bBytes, 0, payload, aBytes.length, bBytes.length); + return ZKAssign.transitionNode(zkw, parent, serverName, + EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLIT, -1, + payload); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/Writables.java b/src/main/java/org/apache/hadoop/hbase/util/Writables.java index 4bff615..829605c 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Writables.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Writables.java @@ -28,6 +28,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Utility class with methods for manipulating Writable objects @@ -121,6 +123,33 @@ public class Writables { /** * @param bytes serialized bytes + * @return All the hregioninfos that are in the byte array. Keeps reading + * till we hit the end. + * @throws IOException e + */ + public static List getHRegionInfos(final byte [] bytes) + throws IOException { + if (bytes == null) { + throw new IllegalArgumentException("Can't build a writable with empty " + + "bytes array"); + } + DataInputBuffer in = new DataInputBuffer(); + List hris = new ArrayList(); + try { + in.reset(bytes, 0); + while (in.available() > 0) { + HRegionInfo hri = new HRegionInfo(); + hri.readFields(in); + hris.add(hri); + } + } finally { + in.close(); + } + return hris; + } + + /** + * @param bytes serialized bytes * @return A HRegionInfo instance built out of passed bytes * or null if passed bytes are null or an empty array. * @throws IOException e diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index 1ac083d..f1ff071 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -624,7 +624,7 @@ public class ZKAssign { } /** - * Private method that actually performs unassigned node transitions. + * Method that actually performs unassigned node transitions. * *

    Attempts to transition the unassigned node for the specified region * from the expected state to the state in the specified transition data. @@ -655,6 +655,14 @@ public class ZKAssign { String serverName, EventType beginState, EventType endState, int expectedVersion) throws KeeperException { + return transitionNode(zkw, region, serverName, beginState, endState, + expectedVersion, null); + } + + public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region, + String serverName, EventType beginState, EventType endState, + int expectedVersion, final byte [] payload) + throws KeeperException { String encoded = region.getEncodedName(); if(LOG.isDebugEnabled()) { LOG.debug(zkw.prefix("Attempting to transition node " + @@ -694,7 +702,7 @@ public class ZKAssign { // Write new data, ensuring data has not changed since we last read it try { RegionTransitionData data = new RegionTransitionData(endState, - region.getRegionName(), serverName); + region.getRegionName(), serverName, payload); if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) { LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + encoded +