Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1290853) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -168,9 +168,19 @@ private List ignoreStatesRSOffline = Arrays.asList(new EventType[]{ EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED }); - /** + * Set when we are doing master failover processing; cleared when failover + * completes. + */ + private volatile boolean failover = false; + + // Set holding all the regions which got processed while RIT was not + // populated during master failover. + private Map failoverProcessedRegions = + new HashMap(); + + /** * Constructs a new assignment manager. * * @param master @@ -320,8 +330,7 @@ // Scan META to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) and their regions - Map>> deadServers = - rebuildUserRegions(); + Map>> deadServers = rebuildUserRegions(); processDeadServersAndRegionsInTransition(deadServers); @@ -361,29 +370,29 @@ watcher.assignmentZNode); // Run through all regions. If they are not assigned and not in RIT, then // its a clean cluster startup, else its a failover. - boolean regionsToProcess = false; for (Map.Entry e: this.regions.entrySet()) { if (!e.getKey().isMetaTable() && e.getValue() != null) { LOG.debug("Found " + e + " out on cluster"); - regionsToProcess = true; + this.failover = true; break; } if (nodes.contains(e.getKey().getEncodedName())) { LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs"); // Could be a meta region. - regionsToProcess = true; + this.failover = true; break; } } // If we found user regions out on cluster, its a failover. - if (regionsToProcess) { + if (this.failover) { LOG.info("Found regions out on cluster or in RIT; failover"); // Process list of dead servers and regions in RIT. // See HBASE-4580 for more information. processDeadServersAndRecoverLostRegions(deadServers, nodes); - + this.failover = false; + failoverProcessedRegions.clear(); } else { // Fresh cluster startup. LOG.info("Clean cluster startup. Assigning userregions"); @@ -438,10 +447,7 @@ if (data == null) return false; HRegionInfo hri = regionInfo; if (hri == null) { - Pair p = - MetaReader.getRegion(catalogTracker, data.getRegionName()); - if (p == null) return false; - hri = p.getFirst(); + if ((hri = getHRegionInfo(data)) == null) return false; } processRegionsInTransition(data, hri, deadServers, stat.getVersion()); return true; @@ -456,6 +462,12 @@ LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + data.getEventType()); synchronized (regionsInTransition) { + RegionState regionState = regionsInTransition.get(encodedRegionName); + if (regionState != null || + failoverProcessedRegions.containsKey(encodedRegionName)) { + // Just return + return; + } switch (data.getEventType()) { case M_ZK_REGION_CLOSING: // If zk node of the region was updated by a live server skip this @@ -472,17 +484,35 @@ regionInfo, RegionState.State.CLOSING, data.getStamp(), data.getOrigin())); } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_CLOSED: case RS_ZK_REGION_FAILED_OPEN: // Region is closed, insert into RIT and handle it addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data); + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case M_ZK_REGION_OFFLINE: - // Region is offline, insert into RIT and handle it like a closed - addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + // If zk node of the region was updated by a live server skip this + // region and just add it into RIT. + if (isOnDeadServer(regionInfo, deadServers) && + (data.getOrigin() == null || + !serverManager.isServerOnline(data.getOrigin()))) { + // Region is offline, insert into RIT and handle it like a closed + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else if (data.getOrigin() != null && + !serverManager.isServerOnline(data.getOrigin())) { + // to handle cases where offline node is created but sendRegionOpen + // RPC is not yet sent + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data + .getOrigin())); + } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_OPENING: @@ -505,6 +535,7 @@ } regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, RegionState.State.OPENING, data.getStamp(), data.getOrigin())); + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_OPENED: @@ -528,10 +559,12 @@ new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion) .process(); } + failoverProcessedRegions.put(encodedRegionName, regionInfo); break; } } } + /** * Put the region hri into an offline state up in zk. @@ -607,6 +640,7 @@ */ private void handleRegion(final RegionTransitionData data, int expectedVersion) { synchronized(regionsInTransition) { + HRegionInfo hri = null; if (data == null || data.getOrigin() == null) { LOG.warn("Unexpected NULL input " + data); return; @@ -684,6 +718,14 @@ break; case M_ZK_REGION_CLOSING: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSING, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see CLOSING after we have asked it to CLOSE or additional // times after already being in state of CLOSING if (regionState == null || @@ -700,6 +742,17 @@ break; case RS_ZK_REGION_CLOSED: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSED, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + removeClosedRegion(regionState.getRegion()); + new ClosedRegionHandler(master, this, regionState.getRegion()) + .process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see CLOSED after CLOSING but possible after PENDING_CLOSE if (regionState == null || (!regionState.isPendingClose() && !regionState.isClosing())) { @@ -720,6 +773,16 @@ break; case RS_ZK_REGION_FAILED_OPEN: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSED, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + new ClosedRegionHandler(master, this, regionState.getRegion()) + .process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName + @@ -735,6 +798,14 @@ break; case RS_ZK_REGION_OPENING: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPENING, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see OPENING after we have asked it to OPEN or additional // times after already being in state of OPENING if (regionState == null || @@ -752,6 +823,16 @@ break; case RS_ZK_REGION_OPENED: + hri = checkIfInFailover(regionState, encodedName, data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPEN, data + .getStamp(), data.getOrigin()); + regionsInTransition.put(encodedName, regionState); + new OpenedRegionHandler(master, this, regionState.getRegion(), data + .getOrigin(), expectedVersion).process(); + failoverProcessedRegions.put(encodedName, hri); + break; + } // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { @@ -774,6 +855,44 @@ } /** + * Checks whether the callback came while RIT was not yet populated during + * master failover. + * @param regionState + * @param encodedName + * @param data + * @return hri + */ + private HRegionInfo checkIfInFailover(RegionState regionState, + String encodedName, RegionTransitionData data) { + if (regionState == null && this.failover && + (failoverProcessedRegions.containsKey(encodedName) == false || + failoverProcessedRegions.get(encodedName) == null)) { + HRegionInfo hri = this.failoverProcessedRegions.get(encodedName); + if (hri == null) hri = getHRegionInfo(data); + return hri; + } + return null; + } + + /** + * Gets the HRegionInfo from the META table + * @param data + * @return HRegionInfo hri for the region + */ + private HRegionInfo getHRegionInfo(RegionTransitionData data) { + Pair p = null; + try { + p = MetaReader.getRegion(catalogTracker, data.getRegionName()); + if (p == null) return null; + return p.getFirst(); + } catch (IOException e) { + master.abort("Aborting because error occoured while reading " + + data.getRegionName() + " from .META.", e); + return null; + } + } + + /** * @return Returns true if this RegionState is splittable; i.e. the * RegionState is currently in splitting state or pending_close or * null (Anything else will return false). (Anything else will return false).