diff --git a/src/main/java/org/apache/hadoop/hbase/HMsg.java b/src/main/java/org/apache/hadoop/hbase/HMsg.java index c53460f..a07bef2 100644 --- a/src/main/java/org/apache/hadoop/hbase/HMsg.java +++ b/src/main/java/org/apache/hadoop/hbase/HMsg.java @@ -48,11 +48,6 @@ public class HMsg implements Writable { STOP_REGIONSERVER, /** - * Region server split the region associated with this message. - */ - REGION_SPLIT, - - /** * When RegionServer receives this message, it goes into a sleep that only * an exit will cure. This message is sent by unit tests simulating * pathological states. @@ -229,10 +224,6 @@ public class HMsg implements Writable { out.writeBoolean(true); Bytes.writeByteArray(out, this.message); } - if (this.type.equals(Type.REGION_SPLIT)) { - this.daughterA.write(out); - this.daughterB.write(out); - } } /** @@ -246,11 +237,5 @@ public class HMsg implements Writable { if (hasMessage) { this.message = Bytes.readByteArray(in); } - if (this.type.equals(Type.REGION_SPLIT)) { - this.daughterA = new HRegionInfo(); - this.daughterB = new HRegionInfo(); - this.daughterA.readFields(in); - this.daughterB.readFields(in); - } } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index b48b390..de13e27 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -104,6 +104,8 @@ public abstract class EventHandler implements Runnable, Comparable { RS_ZK_REGION_CLOSED (2), // RS has finished closing a region RS_ZK_REGION_OPENING (3), // RS is in process of opening a region RS_ZK_REGION_OPENED (4), // RS has finished opening a region + RS_ZK_REGION_SPLITTING (5), // RS has started a region split + RS_ZK_REGION_SPLIT (6), // RS split has completed. // Messages originating from Master to RS M_RS_OPEN_REGION (20), // Master asking RS to open a region diff --git a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 6914c69..f5b20a2 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -112,6 +112,7 @@ public class ExecutorService { case RS_ZK_REGION_OPENED: return ExecutorType.MASTER_OPEN_REGION; + case RS_ZK_REGION_SPLIT: case M_SERVER_SHUTDOWN: return ExecutorType.MASTER_SERVER_OPERATIONS; diff --git a/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java index 5e3cc27..cdf5939 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java @@ -47,6 +47,8 @@ public class RegionTransitionData implements Writable { /** Time the event was created. Required but automatically set. */ private long stamp; + private byte [] payload; + /** * Writable constructor. Do not use directly. */ @@ -90,10 +92,31 @@ public class RegionTransitionData implements Writable { */ public RegionTransitionData(EventType eventType, byte [] regionName, String serverName) { + this(eventType, regionName, serverName, null); + } + + /** + * Construct data for a new region transition event with the specified event + * type, region name, and server name. + * + *

Used when the server name is known (a regionserver is setting it). + * + *

Valid types for this constructor are {@link EventType#RS_ZK_REGION_CLOSING}, + * {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING}, + * and {@link EventType#RS_ZK_REGION_OPENED}. + * + * @param eventType type of event + * @param regionName name of region as per HRegionInfo#getRegionName() + * @param serverName name of server setting data + * @param payload Optional extra payload. Can be null + */ + public RegionTransitionData(EventType eventType, byte [] regionName, + String serverName, final byte [] payload) { this.eventType = eventType; this.stamp = System.currentTimeMillis(); this.regionName = regionName; this.serverName = serverName; + this.payload = payload; } /** @@ -106,6 +129,8 @@ public class RegionTransitionData implements Writable { *

  • {@link EventType#RS_ZK_REGION_CLOSED} *
  • {@link EventType#RS_ZK_REGION_OPENING} *
  • {@link EventType#RS_ZK_REGION_OPENED} + *
  • {@link EventType#RS_ZK_REGION_SPLITTING} + *
  • {@link EventType#RS_ZK_REGION_SPLIT} * * @return type of region transition event */ @@ -142,6 +167,13 @@ public class RegionTransitionData implements Writable { return stamp; } + /** + * @return Payload if any. + */ + public byte [] getPayload() { + return this.payload; + } + @Override public void readFields(DataInput in) throws IOException { // the event type byte @@ -157,6 +189,9 @@ public class RegionTransitionData implements Writable { } else { serverName = null; } + if (in.readBoolean()) { + this.payload = Bytes.readByteArray(in); + } } @Override @@ -169,6 +204,10 @@ public class RegionTransitionData implements Writable { if(serverName != null) { out.writeUTF(serverName); } + out.writeBoolean(this.payload != null); + if (this.payload != null) { + Bytes.writeByteArray(out, this.payload); + } } /** @@ -207,4 +246,4 @@ public class RegionTransitionData implements Writable { return "region=" + Bytes.toString(regionName) + ", server=" + serverName + ", state=" + eventType; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f14a0ed..2694a42 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -53,22 +53,24 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; +import org.apache.hadoop.hbase.master.handler.SplitRegionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.AsyncCallback; @@ -365,6 +367,51 @@ public class AssignmentManager extends ZooKeeperListener { // Nothing to do. break; + case RS_ZK_REGION_SPLITTING: + if (regionState != null) { + // The SPLITTING state is being tickled; update timers and break + if (!regionState.isSplitting()) { + LOG.warn("Received SPLITTING for region " + + prettyPrintedRegionName + " from server " + + data.getServerName() + " but region was in state " + + regionState + " and not in expected SPLITTING state"); + return; + } + regionState.update(RegionState.State.SPLITTING); + break; + } + addSplittingToRIT(data.getServerName(), encodedName); + break; + + case RS_ZK_REGION_SPLIT: + if (regionState == null) { + LOG.warn("Received SPLIT for region " + prettyPrintedRegionName + + " from server " + data.getServerName() + + " but region was not first in SPLITTING state; continuing"); + addSplittingToRIT(data.getServerName(), encodedName); + } + // Check it has daughters. + byte [] payload = data.getPayload(); + List daughters = null; + try { + daughters = Writables.getHRegionInfos(payload, 0, payload.length); + } catch (IOException e) { + LOG.warn("Failed reading split payload for " + + prettyPrintedRegionName + "; dropped split!"); + break; + } + assert daughters.size() == 2; + // Assert that we can get a serverinfo for this server. + HServerInfo hsi = this.serverManager.getServerInfo(data.getServerName()); + if (hsi == null) { + LOG.warn("No serverinfo for " + data.getServerName() + + "; dropped split!"); + } + // Run handler to do the rest of the SPLIT handling. + this.executorService.submit(new SplitRegionHandler(master, this, + regionState.getRegion(), hsi, daughters)); + break; + case RS_ZK_REGION_CLOSING: // Should see CLOSING after we have asked it to CLOSE or additional // times after already being in state of CLOSING @@ -436,6 +483,58 @@ public class AssignmentManager extends ZooKeeperListener { } /** + * @param serverName + * @param encodedName + * @return Add a SPLITTING state to RIT for passed region encodedName + */ + private RegionState addSplittingToRIT(final String serverName, + final String encodedName) { + RegionState regionState = null; + synchronized (this.regions) { + regionState = findHRegionInfoThenAddToRIT(serverName, encodedName); + regionState.update(RegionState.State.SPLITTING); + } + return regionState; + } + + /** + * Caller must hold lock on this.regions. + * @param serverName + * @param encodedName + * @return The instance of RegionState that was added to RIT or null if error. + */ + private RegionState findHRegionInfoThenAddToRIT(final String serverName, + final String encodedName) { + HRegionInfo hri = findHRegionInfo(serverName, encodedName); + if (hri == null) { + LOG.warn("Region " + encodedName + " not found on server " + serverName + + "; failed processing"); + return null; + } + // Add to regions in transition, then update state to SPLITTING. + return addToRegionsInTransition(hri); + } + + /** + * Caller must hold lock on this.regions. + * @param serverName + * @param encodedName + * @return Found HRegionInfo or null. + */ + private HRegionInfo findHRegionInfo(final String serverName, + final String encodedName) { + List hris = this.servers.get(serverName); + HRegionInfo foundHri = null; + for (HRegionInfo hri: hris) { + if (hri.getEncodedName().equals(encodedName)) { + foundHri = hri; + break; + } + } + return foundHri; + } + + /** * Handle a ZK unassigned node transition triggered by HBCK repair tool. *

    * This is handled in a separate code path because it breaks the normal rules. @@ -1747,24 +1846,6 @@ public class AssignmentManager extends ZooKeeperListener { public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent, final HRegionInfo a, final HRegionInfo b) { regionOffline(parent); - // Remove any CLOSING node, if exists, due to race between master & rs - // for close & split. Not putting into regionOffline method because it is - // called from various locations. - try { - RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher, - parent.getEncodedName(), null); - if (node != null) { - if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) { - ZKAssign.deleteClosingNode(this.watcher, parent); - } else { - LOG.warn("Split report has RIT node (shouldnt have one): " + - parent + " node: " + node); - } - } - } catch (KeeperException e) { - LOG.warn("Exception while validating RIT during split report", e); - } - regionOnline(a, hsi); regionOnline(b, hsi); @@ -1844,7 +1925,9 @@ public class AssignmentManager extends ZooKeeperListener { OPEN, // server opened region and updated meta PENDING_CLOSE, // sent rpc to server to close but has not begun CLOSING, // server has begun to close but not yet done - CLOSED // server closed region and updated meta + CLOSED, // server closed region and updated meta + SPLITTING, + SPLIT } private State state; @@ -1912,6 +1995,10 @@ public class AssignmentManager extends ZooKeeperListener { return state == State.OFFLINE; } + public boolean isSplitting() { + return state == State.SPLITTING; + } + @Override public String toString() { return region.getRegionNameAsString() + " state=" + state + diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 4d921da..42c8bda 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -277,11 +277,6 @@ public class ServerManager { for (HMsg msg: msgs) { LOG.info("Received " + msg + " from " + serverInfo.getServerName()); switch (msg.getType()) { - case REGION_SPLIT: - this.services.getAssignmentManager().handleSplitReport(serverInfo, - msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB()); - break; - default: LOG.error("Unhandled msg type " + msg); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java new file mode 100644 index 0000000..a8072e0 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java @@ -0,0 +1,79 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.handler; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.zookeeper.KeeperException; + +/** + * Handles SPLIT region event on Master. + */ +public class SplitRegionHandler extends EventHandler implements TotesHRegionInfo { + private static final Log LOG = LogFactory.getLog(SplitRegionHandler.class); + private final AssignmentManager assignmentManager; + private final HRegionInfo parent; + private final HServerInfo serverInfo; + private final List daughters; + + public SplitRegionHandler(Server server, + AssignmentManager assignmentManager, HRegionInfo regionInfo, + HServerInfo serverInfo, final List daughters) { + super(server, EventType.RS_ZK_REGION_SPLIT); + this.assignmentManager = assignmentManager; + this.parent = regionInfo; + this.serverInfo = serverInfo; + this.daughters = daughters; + } + + @Override + public HRegionInfo getHRegionInfo() { + return this.parent; + } + + @Override + public void process() { + LOG.debug("Handling SPLIT event for " + this.parent.getEncodedName() + + "; deleting node"); + // Remove region from ZK + try { + ZKAssign.deleteNode(this.server.getZooKeeper(), + this.parent.getEncodedName(), + EventHandler.EventType.RS_ZK_REGION_SPLIT); + } catch (KeeperException e) { + server.abort("Error deleting SPLIT node in ZK for transition ZK node (" + + parent.getEncodedName() + ")", e); + } + this.assignmentManager.handleSplitReport(this.serverInfo, this.parent, + this.daughters.get(0), this.daughters.get(1)); + LOG.info("Handled SPLIT report); parent=" + + this.parent.getRegionNameAsString() + + " daughter a=" + this.daughters.get(0).getRegionNameAsString() + + "daughter b=" + this.daughters.get(1).getRegionNameAsString()); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2eeb19f..48e7b00 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -154,27 +154,22 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { if (!st.prepare()) return; try { st.execute(this.server, this.server); - } catch (IOException ioe) { + } catch (Exception e) { try { LOG.info("Running rollback of failed split of " + - parent.getRegionNameAsString() + "; " + ioe.getMessage()); - st.rollback(this.server); + parent.getRegionNameAsString() + "; " + e.getMessage()); + st.rollback(this.server, this.server); LOG.info("Successful rollback of failed split of " + parent.getRegionNameAsString()); - } catch (RuntimeException e) { + } catch (RuntimeException ee) { // If failed rollback, kill this server to avoid having a hole in table. LOG.info("Failed rollback of failed split of " + - parent.getRegionNameAsString() + " -- aborting server", e); + parent.getRegionNameAsString() + " -- aborting server", ee); this.server.abort("Failed split"); } return; } - // Now tell the master about the new regions. If we fail here, its OK. - // Basescanner will do fix up. And reporting split to master is going away. - // TODO: Verify this still holds in new master rewrite. - this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(), - st.getSecondDaughter()); LOG.info("Region split, META updated, and report to master. Parent=" + parent.getRegionInfo().getRegionNameAsString() + ", new regions: " + st.getFirstDaughter().getRegionNameAsString() + ", " + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 22c9b3c..3ba02df 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1535,25 +1535,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } /** - * Add to the outbound message buffer - * - * When a region splits, we need to tell the master that there are two new - * regions that need to be assigned. - * - * We do not need to inform the master about the old region, because we've - * updated the meta or root regions, and the master will pick that up on its - * next rescan of the root or meta tables. - */ - void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, - HRegionInfo newRegionB) { - this.outboundMsgs.add(new HMsg( - HMsg.Type.REGION_SPLIT, oldRegion, newRegionA, - newRegionB, Bytes.toBytes("Daughters; " - + newRegionA.getRegionNameAsString() + ", " - + newRegionB.getRegionNameAsString()))); - } - - /** * Closes all regions. Called on our way out. * Assumes that its not possible for new regions to be added to onlineRegions * while this method runs. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index eccb4eb..24061b9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -40,13 +40,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -96,6 +103,10 @@ public class SplitTransaction { */ enum JournalEntry { /** + * Set region as in transition, set it into SPLITTING state. + */ + SET_SPLITTING_IN_ZK, + /** * We created the temporary split data directory. */ CREATE_SPLIT_DIR, @@ -185,6 +196,8 @@ public class SplitTransaction { * @param services Used to online/offline regions. * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)} * @return Regions created + * @throws KeeperException + * @throws NodeExistsException * @see #rollback(OnlineRegions) */ public PairOfSameType execute(final Server server, @@ -202,12 +215,25 @@ public class SplitTransaction { this.parent.getCoprocessorHost().preSplit(); } - // If true, no cluster to write meta edits into. + // If true, no cluster to write meta edits to or to update znodes in. boolean testing = server == null? true: server.getConfiguration().getBoolean("hbase.testing.nocluster", false); this.fileSplitTimeout = testing ? this.fileSplitTimeout : - server.getConfiguration().getLong( - "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout); + server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", + this.fileSplitTimeout); + + // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't + // have zookeeper so don't do zk stuff if zookeeper is null + if (server != null && server.getZooKeeper() != null) { + try { + createNodeSplitting(server.getZooKeeper(), + this.parent.getRegionInfo(), server.getServerName()); + } catch (KeeperException e) { + throw new IOException("Failed setting SPLITTING znode on " + + this.parent.getRegionNameAsString(), e); + } + } + this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); createSplitDir(this.parent.getFilesystem(), this.splitdir); this.journal.add(JournalEntry.CREATE_SPLIT_DIR); @@ -254,14 +280,9 @@ public class SplitTransaction { this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo()); } - // The is the point of no return. We are committed to the split now. We + // This is the point of no return. We are committed to the split now. We // have still the daughter regions to open but meta has been changed. - // If we fail from here on out, we can not rollback so, we'll just abort. - // The meta has been changed though so there will need to be a fixup run - // during processing of the crashed server by master (TODO: Verify this in place). - - // TODO: Could we be smarter about the sequence in which we do these steps? - + // If we fail from here on out, we cannot rollback so, we'll just abort. if (!testing) { // Open daughters in parallel. DaughterOpener aOpener = new DaughterOpener(server, services, a); @@ -276,6 +297,16 @@ public class SplitTransaction { } } + // Tell master about split by updating zk. If we fail abort. + if (server != null && server.getZooKeeper() != null) { + try { + transitionNodeSplit(server.getZooKeeper(), parent.getRegionInfo(), + a.getRegionInfo(), b.getRegionInfo(), server.getServerName()); + } catch (Exception e) { + server.abort("Failed telling master about split", e); + } + } + // Coprocessor callback if (this.parent.getCoprocessorHost() != null) { this.parent.getCoprocessorHost().postSplit(a,b); @@ -543,13 +574,20 @@ public class SplitTransaction { * @return The region we were splitting * @throws IOException If thrown, rollback failed. Take drastic action. */ - public void rollback(final OnlineRegions or) throws IOException { + public void rollback(final Server server, final OnlineRegions or) + throws IOException { FileSystem fs = this.parent.getFilesystem(); ListIterator iterator = this.journal.listIterator(this.journal.size()); while (iterator.hasPrevious()) { JournalEntry je = iterator.previous(); switch(je) { + case SET_SPLITTING_IN_ZK: + if (server != null && server.getZooKeeper() != null) { + cleanZK(server, this.parent.getRegionInfo()); + } + break; + case CREATE_SPLIT_DIR: cleanupSplitDir(fs, this.splitdir); break; @@ -623,4 +661,90 @@ public class SplitTransaction { cleanupSplitDir(r.getFilesystem(), splitdir); LOG.info("Cleaned up old failed split transaction detritus: " + splitdir); } -} + + private static void cleanZK(final Server server, final HRegionInfo hri) { + try { + ZKAssign.deleteNodeFailSilent(server.getZooKeeper(), hri); + } catch (KeeperException e) { + server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); + } + } + + /** + * Creates a new ephemeral node in the SPLITTING state for the specified region. + * Create it ephemeral in case regionserver dies mid-split. + * + *

    Does not transition nodes from other states. If a node already exists + * for this region, a {@link NodeExistsException} will be thrown. + * + * @param zkw zk reference + * @param region region to be created as offline + * @param serverName server event originates from + * @return Version of znode created. + * @throws IOException + */ + private static void createNodeSplitting(final ZooKeeperWatcher zkw, + final HRegionInfo region, final String serverName) + throws KeeperException, IOException { + LOG.debug(zkw.prefix("Creating ephemeral node for " + + region.getEncodedName() + " in SPLITTING state")); + RegionTransitionData data = + new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING, + region.getRegionName(), serverName); + // This synchronization is copied from ZKAssign. + synchronized(zkw.getNodes()) { + String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); + zkw.getNodes().add(node); + if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) { + throw new IOException("Failed create of ephemeral " + node); + } + } + } + + /** + * Transitions an existing node for the specified region which is + * currently in the SPLITTING state to be in the SPLIT state. Converts the + * ephemeral SPLITTING znode to a permanent SPLIT node. Master cleans up + * SPLIT znode when it reads it. + * + *

    Does not transition nodes from other states. If for some reason the + * node could not be transitioned, the method returns -1. If the transition + * is successful, the version of the node after transition is returned. + * + *

    This method can fail and return false for three different reasons: + *

    • Node for this region does not exist
    • + *
    • Node for this region is not in SPLITTING state
    • + *
    • After verifying SPLITTING state, update fails because of wrong version + * (this should never actually happen since an RS only does this transition + * following a transition to SPLITTING. if two RS are conflicting, one would + * fail the original transition to SPLITTING and not this transition)
    • + *
    + * + *

    Does not set any watches. + * + *

    This method should only be used by a RegionServer when completing the + * open of a region. + * + * @param zkw zk reference + * @param parent region to be transitioned to opened + * @param a Daughter a of split + * @param b Daughter b of split + * @param serverName server event originates from + * @return version of node after transition, -1 if unsuccessful transition + * @throws KeeperException if unexpected zookeeper exception + * @throws IOException + */ + private static int transitionNodeSplit(ZooKeeperWatcher zkw, + HRegionInfo parent, HRegionInfo a, HRegionInfo b, String serverName) + throws KeeperException, IOException { + // Just serialize the daughter hregioninfos as payload. + byte [] aBytes = Writables.getBytes(a); + byte [] bBytes = Writables.getBytes(b); + byte [] payload = new byte[aBytes.length + bBytes.length]; + System.arraycopy(aBytes, 0, payload, 0, aBytes.length); + System.arraycopy(bBytes, 0, payload, aBytes.length, bBytes.length); + return ZKAssign.transitionNode(zkw, parent, serverName, + EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLIT, -1, + payload); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/Writables.java b/src/main/java/org/apache/hadoop/hbase/util/Writables.java index 4bff615..4e872cc 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Writables.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Writables.java @@ -28,6 +28,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Utility class with methods for manipulating Writable objects @@ -121,6 +123,34 @@ public class Writables { /** * @param bytes serialized bytes + * @return All the hregioninfos that are in the byte array. Keeps reading + * till we hit the end. + * @throws IOException e + */ + public static List getHRegionInfos(final byte [] bytes, + final int offset, final int length) + throws IOException { + if (bytes == null) { + throw new IllegalArgumentException("Can't build a writable with empty " + + "bytes array"); + } + DataInputBuffer in = new DataInputBuffer(); + List hris = new ArrayList(); + try { + in.reset(bytes, offset, length); + while (in.available() > 0) { + HRegionInfo hri = new HRegionInfo(); + hri.readFields(in); + hris.add(hri); + } + } finally { + in.close(); + } + return hris; + } + + /** + * @param bytes serialized bytes * @return A HRegionInfo instance built out of passed bytes * or null if passed bytes are null or an empty array. * @throws IOException e diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index 1ac083d..f15152b 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -387,7 +387,7 @@ public class ZKAssign { * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NoNodeException if node does not exist */ - private static boolean deleteNode(ZooKeeperWatcher zkw, String regionName, + public static boolean deleteNode(ZooKeeperWatcher zkw, String regionName, EventType expectedState) throws KeeperException, KeeperException.NoNodeException { LOG.debug(zkw.prefix("Deleting existing unassigned " + @@ -412,8 +412,7 @@ public class ZKAssign { if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) { LOG.warn(zkw.prefix("Attempting to delete " + "unassigned node in " + expectedState + - " state but " + - "after verifying it was in OPENED state, we got a version mismatch")); + " state but after verifying state, we got a version mismatch")); return false; } LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " + @@ -624,7 +623,7 @@ public class ZKAssign { } /** - * Private method that actually performs unassigned node transitions. + * Method that actually performs unassigned node transitions. * *

    Attempts to transition the unassigned node for the specified region * from the expected state to the state in the specified transition data. @@ -655,6 +654,14 @@ public class ZKAssign { String serverName, EventType beginState, EventType endState, int expectedVersion) throws KeeperException { + return transitionNode(zkw, region, serverName, beginState, endState, + expectedVersion, null); + } + + public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region, + String serverName, EventType beginState, EventType endState, + int expectedVersion, final byte [] payload) + throws KeeperException { String encoded = region.getEncodedName(); if(LOG.isDebugEnabled()) { LOG.debug(zkw.prefix("Attempting to transition node " + @@ -694,7 +701,7 @@ public class ZKAssign { // Write new data, ensuring data has not changed since we last read it try { RegionTransitionData data = new RegionTransitionData(endState, - region.getRegionName(), serverName); + region.getRegionName(), serverName, payload); if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) { LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + encoded + diff --git a/src/test/java/org/apache/hadoop/hbase/TestHMsg.java b/src/test/java/org/apache/hadoop/hbase/TestHMsg.java index b55956f..b26cee4 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestHMsg.java +++ b/src/test/java/org/apache/hadoop/hbase/TestHMsg.java @@ -56,9 +56,6 @@ public class TestHMsg extends TestCase { } public void testSerialization() throws IOException { - // Check out new HMsg that carries two daughter split regions. - byte [] abytes = Bytes.toBytes("a"); - byte [] bbytes = Bytes.toBytes("b"); byte [] parentbytes = Bytes.toBytes("parent"); HRegionInfo parent = new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")), @@ -68,15 +65,5 @@ public class TestHMsg extends TestCase { byte [] bytes = Writables.getBytes(hmsg); HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg()); assertTrue(close.equals(hmsg)); - // Assert split serializes - HRegionInfo daughtera = - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes); - HRegionInfo daughterb = - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes); - HMsg splithmsg = new HMsg(HMsg.Type.REGION_SPLIT, - parent, daughtera, daughterb, Bytes.toBytes("REGION_SPLIT")); - bytes = Writables.getBytes(splithmsg); - hmsg = (HMsg)Writables.getWritable(bytes, new HMsg()); - assertTrue(splithmsg.equals(hmsg)); } -} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java index befcdaf..8c39c35 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java +++ b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java @@ -113,14 +113,7 @@ public class TestSerialization { * @throws Exception */ @Test public void testRegionInfo() throws Exception { - final String name = "testRegionInfo"; - HTableDescriptor htd = new HTableDescriptor(name); - String [] families = new String [] {"info", "anchor"}; - for (int i = 0; i < families.length; i++) { - htd.addFamily(new HColumnDescriptor(families[i])); - } - HRegionInfo hri = new HRegionInfo(htd, - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri = createRandomRegion("testRegionInfo"); byte [] hrib = Writables.getBytes(hri); HRegionInfo deserializedHri = (HRegionInfo)Writables.getWritable(hrib, new HRegionInfo()); @@ -129,6 +122,29 @@ public class TestSerialization { deserializedHri.getTableDesc().getFamilies().size()); } + @Test public void testRegionInfos() throws Exception { + HRegionInfo hri = createRandomRegion("testRegionInfos"); + byte [] hrib = Writables.getBytes(hri); + byte [] triple = new byte [3 * hrib.length]; + System.arraycopy(hrib, 0, triple, 0, hrib.length); + System.arraycopy(hrib, 0, triple, hrib.length, hrib.length); + System.arraycopy(hrib, 0, triple, hrib.length * 2, hrib.length); + List regions = Writables.getHRegionInfos(triple, 0, triple.length); + assertTrue(regions.size() == 3); + assertTrue(regions.get(0).equals(regions.get(1))); + assertTrue(regions.get(0).equals(regions.get(2))); + } + + private HRegionInfo createRandomRegion(final String name) { + HTableDescriptor htd = new HTableDescriptor(name); + String [] families = new String [] {"info", "anchor"}; + for (int i = 0; i < families.length; i++) { + htd.addFamily(new HColumnDescriptor(families[i])); + } + return new HRegionInfo(htd, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + } + /** * Test ServerInfo serialization * @throws Exception diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 1bca9a1..0db5001 100644 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -271,13 +271,11 @@ public class TestCoprocessorInterface extends HBaseTestCase { regions[i] = each_daughter; i++; } - } - catch (IOException ioe) { + } catch (IOException ioe) { LOG.info("Split transaction of " + r.getRegionNameAsString() + " failed:" + ioe.getMessage()); assertTrue(false); - } - catch (RuntimeException e) { + } catch (RuntimeException e) { LOG.info("Failed rollback of failed split of " + r.getRegionNameAsString() + e.getMessage()); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index f897b59..a4c73ac 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; @@ -1339,7 +1338,7 @@ public class TestHRegion extends HBaseTestCase { try { LOG.info("Running rollback of failed split of " + parent.getRegionNameAsString() + "; " + ioe.getMessage()); - st.rollback(null); + st.rollback(null, null); LOG.info("Successful rollback of failed split of " + parent.getRegionNameAsString()); return null; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java index 67a7089..b85b912 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java @@ -190,7 +190,7 @@ public class TestSplitTransaction { } assertTrue(expectedException); // Run rollback - spiedUponSt.rollback(null); + spiedUponSt.rollback(null, null); // Assert I can scan parent. int parentRowCount2 = countRows(this.parent);