Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1061092) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -24,6 +24,7 @@ import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.IPCCallable; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Stoppable; @@ -53,9 +55,9 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; @@ -66,11 +68,10 @@ import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -1049,7 +1050,7 @@ * @param region server to be unassigned * @param force if region should be closed even if already closing */ - public void unassign(HRegionInfo region, boolean force) { + public void unassign(final HRegionInfo region, boolean force) { LOG.debug("Starting unassignment of region " + region.getRegionNameAsString() + " (offlining)"); synchronized (this.regions) { @@ -1063,77 +1064,96 @@ } String encodedName = region.getEncodedName(); // Grab the state of this region and synchronize on it - RegionState state; + RegionState regionState; synchronized (regionsInTransition) { - state = regionsInTransition.get(encodedName); - if (state == null) { - state = new RegionState(region, RegionState.State.PENDING_CLOSE); - regionsInTransition.put(encodedName, state); - } else if (force && state.isPendingClose()) { + regionState = regionsInTransition.get(encodedName); + if (regionState == null) { + regionState = new RegionState(region, RegionState.State.PENDING_CLOSE); + regionsInTransition.put(encodedName, regionState); + } else if (force && regionState.isPendingClose()) { LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() + " which is already pending close " + "but forcing an additional close"); - state.update(RegionState.State.PENDING_CLOSE); + regionState.update(RegionState.State.PENDING_CLOSE); } else { LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() + " but it is " + - "already in transition (" + state.getState() + ")"); + "already in transition (" + regionState.getState() + ")"); return; } } // Send CLOSE RPC - HServerInfo server = null; + HServerInfo hsi = null; synchronized (this.regions) { - server = regions.get(region); + hsi = regions.get(region); } + // TODO: We should consider making this look more like it does for the + // region open where we catch all throwables and never abort try { - // TODO: We should consider making this look more like it does for the - // region open where we catch all throwables and never abort + new SendRegionCloseCallable(this.serverManager, hsi, + regionState, region).call(); + } catch (Exception e) { + this.master.abort("Failed sending region close", e); + } + } + + /** + * Wrapper around the send region close to catch common connection exceptions. + */ + static class SendRegionCloseCallable extends IPCCallable { + private final ServerManager serverManager; + private final HServerInfo server; + private final RegionState state; + private final HRegionInfo region; + + SendRegionCloseCallable(final ServerManager serverManager, final HServerInfo server, + final RegionState state, final HRegionInfo region) { + this.serverManager = serverManager; + this.server = server; + this.state = state; + this.region = region; + } + + @Override + Object doCall() throws Exception { if (serverManager.sendRegionClose(server, state.getRegion())) { LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); - return; } // This never happens. Currently regionserver close always return true. LOG.debug("Server " + server + " region CLOSE RPC returned false for " + region.getEncodedName()); - } catch (NotServingRegionException nsre) { - LOG.info("Server " + server + " returned " + nsre + " for " + - region.getEncodedName()); - // Presume that master has stale data. Presume remote side just split. - // Presume that the split message when it comes in will fix up the master's - // in memory cluster state. - return; - } catch (ConnectException e) { + return null; + } + + @Override + void doNotServingRegionException(NotServingRegionException e) { LOG.info("Failed connect to " + server + ", message=" + e.getMessage() + ", region=" + region.getEncodedName()); - // Presume that regionserver just failed and we haven't got expired - // server from zk yet. Let expired server deal with clean up. - } catch (java.net.SocketTimeoutException e) { + // Presume that regionserver just failed and we haven't got expired + // server from zk yet. Let expired server deal with clean up. + } + + @Override + void doConnectException(ConnectException e) { + LOG.info("Failed connect to " + server + ", message=" + e.getMessage() + + ", region=" + region.getEncodedName()); + // Presume that regionserver just failed and we haven't got expired + // server from zk yet. Let expired server deal with clean up. + } + + @Override + void doSocketTimeoutException(SocketTimeoutException e) { LOG.info("Server " + server + " returned " + e.getMessage() + " for " + region.getEncodedName()); - // Presume retry or server will expire. - } catch (EOFException e) { + // Presume retry or server will expire. + } + + @Override + void doEOFException(EOFException e) { LOG.info("Server " + server + " returned " + e.getMessage() + " for " + region.getEncodedName()); - // Presume retry or server will expire. - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(); - if (ioe instanceof NotServingRegionException) { - // Failed to close, so pass through and reassign - LOG.debug("Server " + server + " returned " + ioe + " for " + - region.getEncodedName()); - } else if (ioe instanceof EOFException) { - // Failed to close, so pass through and reassign - LOG.debug("Server " + server + " returned " + ioe + " for " + - region.getEncodedName()); - } else { - this.master.abort("Remote unexpected exception", ioe); - } - } catch (Throwable t) { - // For now call abort if unexpected exception -- radical, but will get - // fellas attention. St.Ack 20101012 - this.master.abort("Remote unexpected exception", t); + // Presume retry or server will expire. } } Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1061092) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.IPCCallable; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; @@ -403,6 +404,14 @@ return protocol; } + static class GetConnectionCallable extends IPCCallable { + @Override + public HRegionInterface doCall() throws Exception { + // TODO Auto-generated method stub + return null; + } + } + private boolean verifyRegionLocation(HRegionInterface metaServer, final HServerAddress address, byte [] regionName) Index: src/main/java/org/apache/hadoop/hbase/IPCCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/IPCCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/IPCCallable.java (revision 0) @@ -0,0 +1,78 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.EOFException; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.RemoteException; + +/** + * A specialization on {@link Callable} that will catch 'normal' IPC + * exceptions such as {@link ConnectException} or {@link EOFException}. + * Implement {@link #doCall()}. Its called out of the guts of + * {@link #call()}. Optionally override the exception message handlers. + * @param + */ +public abstract class IPCCallable implements Callable { + public static final Log LOG = LogFactory.getLog(IPCCallable.class); + + public abstract V doCall() throws Exception; + + public abstract void doNotServingRegionException(final NotServingRegionException e); + public abstract void doConnectException(final ConnectException e); + public abstract void doSocketTimeoutException(final SocketTimeoutException e); + public abstract void doEOFException(final EOFException e); + + @Override + public V call() throws Exception { + V result = null; + try { + result = doCall(); + } catch (NotServingRegionException nsre) { + doNotServingRegionException(nsre); + } catch (ConnectException e) { + doConnectException(e); + } catch (SocketTimeoutException e) { + doSocketTimeoutException(e); + } catch (EOFException e) { + doEOFException(e); + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(); + if (ioe instanceof NotServingRegionException) { + doNotServingRegionException((NotServingRegionException)ioe); + } else if (ioe instanceof ConnectException) { + doConnectException((ConnectException)ioe); + } else if (ioe instanceof SocketTimeoutException) { + doSocketTimeoutException((SocketTimeoutException)ioe); + } else if (ioe instanceof EOFException) { + doEOFException((EOFException)ioe); + } else { + throw ioe; + } + } + return result; + } +} \ No newline at end of file