Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1360540) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -1584,13 +1584,14 @@ private void assign(final HRegionInfo region, final RegionState state, final boolean setOfflineInZK, final boolean forceNewPlan, boolean hijack) { + boolean regionAlreadyInTransitionException = false; for (int i = 0; i < this.maximumAssignmentAttempts; i++) { int versionOfOfflineNode = -1; if (setOfflineInZK) { // get the version of the znode after setting it to OFFLINE. // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE - versionOfOfflineNode = setOfflineInZooKeeper(state, - hijack); + versionOfOfflineNode = setOfflineInZooKeeper(state, hijack, + regionAlreadyInTransitionException); if(versionOfOfflineNode != -1){ if (isDisabledorDisablingRegionInRIT(region)) { return; @@ -1665,22 +1666,51 @@ if (t instanceof RemoteException) { t = ((RemoteException) t).unwrapRemoteException(); if (t instanceof RegionAlreadyInTransitionException) { - String errorMsg = "Failed assignment in: " + plan.getDestination() - + " due to " + t.getMessage(); - LOG.error(errorMsg, t); + regionAlreadyInTransitionException = true; + if (LOG.isDebugEnabled()) { + LOG.debug(t.getMessage()); + } + } + } + if (t instanceof java.net.SocketTimeoutException) { + if (isRegionInTransition(region) == null + && plan.getDestination().equals(getRegionServerOfRegion(region))) { + LOG.warn( + "Call openRegion() to " + plan.getDestination() + + " has timed out when trying to assign " + region.getRegionNameAsString() + + ", but the region has already been opened successfully on " + + plan.getDestination() + ".", t); return; + } else { + // The destination region server is probably processing the region open, so it + // might be safer to try this region server again to avoid having two region + // servers open the same region. + LOG.warn("Call openRegion() to " + plan.getDestination() + + " has timed out when trying to assign " + region.getRegionNameAsString() + + ". Trying to assign to this region server again; retry=" + i, t); + state.update(RegionState.State.OFFLINE); + continue; } } - LOG.warn("Failed assignment of " + - state.getRegion().getRegionNameAsString() + " to " + - plan.getDestination() + ", trying to assign elsewhere instead; " + - "retry=" + i, t); + LOG.warn("Failed assignment of " + + state.getRegion().getRegionNameAsString() + + " to " + + plan.getDestination() + + ", trying to assign " + + (regionAlreadyInTransitionException ? "to the same region server" + + " because of RITException;" : "elsewhere instead; ") + "retry=" + i, t); // Clean out plan we failed execute and one that doesn't look like it'll // succeed anyways; we need a new plan! // Transition back to OFFLINE state.update(RegionState.State.OFFLINE); - // Force a new plan and reassign. Will return null if no servers. - if (getRegionPlan(state, plan.getDestination(), true) == null) { + // If region opened on destination of present plan, reassigning to new RS may cause + // double assignments. In case of RITException reassigning to same RS. + RegionPlan newPlan = plan; + if (!regionAlreadyInTransitionException) { + // Force a new plan and reassign. Will return null if no servers. + newPlan = getRegionPlan(state, plan.getDestination(), true); + } + if (newPlan == null) { this.timeoutMonitor.setAllRegionServersOffline(true); LOG.warn("Unable to find a viable location to assign region " + state.getRegion().getRegionNameAsString()); @@ -1708,17 +1738,23 @@ * @param state * @param hijack * - true if needs to be hijacked and reassigned, false otherwise. + * @param regionAlreadyInTransitionException + * - true if we need to retry assignment because of RITException. * @return the version of the offline node if setting of the OFFLINE node was * successful, -1 otherwise. */ - int setOfflineInZooKeeper(final RegionState state, - boolean hijack) { + int setOfflineInZooKeeper(final RegionState state, boolean hijack, + boolean regionAlreadyInTransitionException) { // In case of reassignment the current state in memory need not be // OFFLINE. if (!hijack && !state.isClosed() && !state.isOffline()) { - String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE."; - this.master.abort(msg, new IllegalStateException(msg)); - return -1; + if (!regionAlreadyInTransitionException ) { + String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE."; + this.master.abort(msg, new IllegalStateException(msg)); + return -1; + } else { + LOG.debug("Unexpected state : " + state + " but retrying to assign because RITException."); + } } boolean allowZNodeCreation = false; // Under reassignment if the current state is PENDING_OPEN Index: src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (revision 1360540) +++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (working copy) @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -884,6 +885,58 @@ } } + @Test + public void testAssignRetryInCaseOfSocketTimeOutException() throws IOException, KeeperException, + InterruptedException { + Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, 0)).thenThrow( + new SocketTimeoutException()); + testAssignRetryInCaseOfSocketTimeOutException(RegionOpeningState.OPENED); + testAssignRetryInCaseOfSocketTimeOutException(RegionOpeningState.ALREADY_OPENED); + } + + private void testAssignRetryInCaseOfSocketTimeOutException(RegionOpeningState state) + throws IOException, KeeperException, InterruptedException { + // Create and startup an executor. This is used by AssignmentManager + // handling zk callbacks. + ExecutorService executor = + startupMasterExecutor("testAssignRetryInCaseOfSocketTimeOutException"); + Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, 1)).thenReturn(state); + Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_B, REGIONINFO, 1)).thenReturn(state); + + List serverList = new ArrayList(2); + serverList.add(SERVERNAME_B); + serverList.add(SERVERNAME_A); + Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(serverList); + + // We need a mocked catalog tracker. + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration()); + // Create an AM. + AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer, + executor); + try { + // Make sure our new AM gets callbacks; once registered, can't unregister. + // Thats ok because we make a new zk watcher for each test. + this.watcher.registerListenerFirst(am); + am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A)); + am.assign(REGIONINFO, true); + ServerName regionServer = am.getRegionServerOfRegion(REGIONINFO); + if (state == RegionOpeningState.OPENED) { + // region plan should not change in case of SocketTimeOutException. + assertEquals(true, + am.regionPlans.get(REGIONINFO.getEncodedName()).getDestination().equals(SERVERNAME_A)); + } else { + // region assigned to the same region server after retry also. + assertEquals(true, regionServer.equals(SERVERNAME_A)); + } + } finally { + executor.shutdown(); + am.shutdown(); + // Clean up all znodes + ZKAssign.deleteAllNodes(this.watcher); + } + } + /** * Mocked load balancer class used in the testcase to make sure that the testcase waits until * random assignment is called and the gate variable is set to true.