diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AdminServiceSecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AdminServiceSecurityInfo.java new file mode 100644 index 0000000..2f1af49 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AdminServiceSecurityInfo.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.KerberosInfo; +import org.apache.hadoop.hbase.security.TokenInfo; + +/** + * Interface to carry annotations used by security. This class goes with AdminProtos.AdminService + */ +@KerberosInfo(serverPrincipal = "hbase.regionserver.kerberos.principal") +@TokenInfo("HBASE_AUTH_TOKEN") +@InterfaceAudience.Private +public interface AdminServiceSecurityInfo {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientServiceSecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientServiceSecurityInfo.java new file mode 100644 index 0000000..03c8c30 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientServiceSecurityInfo.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.security.KerberosInfo; +import org.apache.hadoop.hbase.security.TokenInfo; + +/** + * Interface with annotations used by security. This class goes with + * ClientProtos.ClientService. + */ +@KerberosInfo(serverPrincipal = "hbase.regionserver.kerberos.principal") +// TODO: It is always HBASE_AUTH_TOKEN. Is that right? Whether client or admin operation. +@TokenInfo("HBASE_AUTH_TOKEN") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ClientServiceSecurityInfo {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java deleted file mode 100644 index 3b0c535..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.hadoop.hbase; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Marker Interface used by ipc. We need a means of referring to - * ipc "protocols" generically. For example, we need to tell an rpc - * server the "protocols" it implements and it helps if all protocols - * implement a common 'type'. That is what this Interface is used for. - */ -// This Interface replaces the old VersionedProtocol Interface. Rather -// than redo a bunch of code its removal, instead we put in place this -// Interface and change all VP references to Protocol references. - -// It is moved up here to top-level because it is ugly having members -// of super packages reach down into subpackages. -public interface IpcProtocol {} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java deleted file mode 100644 index 2e4b76c..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; - -/** - * Protocol that a client uses to communicate with the Master (for admin purposes). - */ -@KerberosInfo( - serverPrincipal = "hbase.master.kerberos.principal") -@TokenInfo("HBASE_AUTH_TOKEN") -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface MasterAdminProtocol -extends MasterAdminService.BlockingInterface, MasterProtocol {} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminServiceSecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminServiceSecurityInfo.java new file mode 100644 index 0000000..c73126f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterAdminServiceSecurityInfo.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.security.KerberosInfo; +import org.apache.hadoop.hbase.security.TokenInfo; + +/** + * Interface that has security info for MasterAdminProtos.MasterAdminService + */ +@KerberosInfo(serverPrincipal = "hbase.master.kerberos.principal") +@TokenInfo("HBASE_AUTH_TOKEN") +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface MasterAdminServiceSecurityInfo {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java deleted file mode 100644 index b8c3dff..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; - -/** - * Protocol that a client uses to communicate with the Master (for monitoring purposes). - */ -@KerberosInfo( - serverPrincipal = "hbase.master.kerberos.principal") -@TokenInfo("HBASE_AUTH_TOKEN") -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface MasterMonitorProtocol -extends MasterMonitorService.BlockingInterface, MasterProtocol {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorServiceSecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorServiceSecurityInfo.java new file mode 100644 index 0000000..c898f7d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterMonitorServiceSecurityInfo.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.security.KerberosInfo; +import org.apache.hadoop.hbase.security.TokenInfo; + +/** + * Interface of SecurityInfo for MasterMonitorProtos.MasterMonitorService + */ +@KerberosInfo(serverPrincipal = "hbase.master.kerberos.principal") +@TokenInfo("HBASE_AUTH_TOKEN") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface MasterMonitorServiceSecurityInfo {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java deleted file mode 100644 index 17632bd..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; - -/** - * Functions implemented by all the master protocols: e.g. {@link MasterAdminProtocol} - * and {@link MasterMonitorProtocol}. Currently, the only shared method - * {@link #isMasterRunning(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest)} - * which is used on connection setup to check if the master has been stopped. - */ -public interface MasterProtocol extends IpcProtocol, MasterService.BlockingInterface {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java index 288068e..d9027ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java @@ -98,7 +98,6 @@ public class RemoteExceptionHandler { if (t instanceof IOException) { i = (IOException) t; - } else { i = new IOException("server error"); i.initCause(t); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index 46b85f1..4b48562 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -265,7 +265,7 @@ public class CatalogTracker { * @throws IOException * @deprecated Use #getMetaServerConnection(long) */ - public AdminProtocol waitForMetaServerConnection(long timeout) + public AdminService.BlockingInterface waitForMetaServerConnection(long timeout) throws InterruptedException, NotAllMetaRegionsOnlineException, IOException { return getMetaServerConnection(timeout); } @@ -281,7 +281,7 @@ public class CatalogTracker { * @throws NotAllMetaRegionsOnlineException if timed out waiting * @throws IOException */ - AdminProtocol getMetaServerConnection(long timeout) + AdminService.BlockingInterface getMetaServerConnection(long timeout) throws InterruptedException, NotAllMetaRegionsOnlineException, IOException { return getCachedConnection(waitForMeta(timeout)); } @@ -313,14 +313,14 @@ public class CatalogTracker { * invocation, or may be null. * @throws IOException */ - private AdminProtocol getCachedConnection(ServerName sn) + private AdminService.BlockingInterface getCachedConnection(ServerName sn) throws IOException { if (sn == null) { return null; } - AdminProtocol protocol = null; + AdminService.BlockingInterface service = null; try { - protocol = connection.getAdmin(sn); + service = connection.getAdmin(sn); } catch (RetriesExhaustedException e) { if (e.getCause() != null && e.getCause() instanceof ConnectException) { // Catch this; presume it means the cached connection has gone bad. @@ -349,7 +349,7 @@ public class CatalogTracker { } } - return protocol; + return service; } /** @@ -367,7 +367,7 @@ public class CatalogTracker { // rather than have to pass it in. Its made awkward by the fact that the // HRI is likely a proxy against remote server so the getServerName needs // to be fixed to go to a local method or to a cache before we can do this. - private boolean verifyRegionLocation(AdminProtocol hostingServer, + private boolean verifyRegionLocation(AdminService.BlockingInterface hostingServer, final ServerName address, final byte [] regionName) throws IOException { if (hostingServer == null) { @@ -411,9 +411,9 @@ public class CatalogTracker { */ public boolean verifyMetaRegionLocation(final long timeout) throws InterruptedException, IOException { - AdminProtocol connection = null; + AdminService.BlockingInterface service = null; try { - connection = waitForMetaServerConnection(timeout); + service = waitForMetaServerConnection(timeout); } catch (NotAllMetaRegionsOnlineException e) { // Pass } catch (ServerNotRunningYetException e) { @@ -421,8 +421,8 @@ public class CatalogTracker { } catch (UnknownHostException e) { // Pass -- server name doesn't resolve so it can't be assigned anything. } - return (connection == null)? false: - verifyRegionLocation(connection, + return (service == null)? false: + verifyRegionLocation(service, this.metaRegionTracker.getMetaRegionLocation(), META_REGION_NAME); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java deleted file mode 100644 index 7a43451..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.IpcProtocol; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; - -/** - * Protocol that a HBase client uses to communicate with a region server. - */ -@KerberosInfo( - serverPrincipal = "hbase.regionserver.kerberos.principal") -@TokenInfo("HBASE_AUTH_TOKEN") -@InterfaceAudience.Private -public interface AdminProtocol -extends AdminService.BlockingInterface, IpcProtocol {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java deleted file mode 100644 index 16ae40c..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.IpcProtocol; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; - -/** - * Protocol that a HBase client uses to communicate with a region server. - */ -@KerberosInfo( - serverPrincipal = "hbase.regionserver.kerberos.principal") -@TokenInfo("HBASE_AUTH_TOKEN") -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface ClientProtocol -extends ClientService.BlockingInterface, IpcProtocol {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index eb7f0c3..160cba7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -318,8 +318,8 @@ public class ClientScanner extends AbstractClientScanner { if (retryAfterOutOfOrderException) { retryAfterOutOfOrderException = false; } else { - throw new DoNotRetryIOException("Failed after retry" - + ", it could be cause by rpc timeout", e); + throw new DoNotRetryIOException("Failed after retry of " + + "OutOfOrderScannerNextException: was there a rpc timeout?", e); } } // Clear region diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index f8c3ec6..1e19a45 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -570,7 +572,7 @@ public class HBaseAdmin implements Abortable, Closeable { firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true); Result[] values = null; // Get a batch at a time. - ClientProtocol server = connection.getClient(firstMetaServer.getServerName()); + ClientService.BlockingInterface server = connection.getClient(firstMetaServer.getServerName()); try { ScanResponse response = server.scan(null, request); values = ResponseConverter.getResults(response); @@ -583,7 +585,7 @@ public class HBaseAdmin implements Abortable, Closeable { if (values == null || values.length == 0) { tableExists = false; GetTableDescriptorsResponse htds; - MasterMonitorKeepAliveConnection master = connection.getKeepAliveMasterMonitor(); + MasterMonitorKeepAliveConnection master = connection.getKeepAliveMasterMonitorService(); try { GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(null); @@ -607,7 +609,7 @@ public class HBaseAdmin implements Abortable, Closeable { if(tries == numRetries - 1) { // no more tries left if (ex instanceof RemoteException) { throw ((RemoteException) ex).unwrapRemoteException(); - }else { + } else { throw ex; } } @@ -1221,7 +1223,7 @@ public class HBaseAdmin implements Abortable, Closeable { "The servername cannot be null or empty."); } ServerName sn = new ServerName(serverName); - AdminProtocol admin = this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // Close the region without updating zk state. CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(encodedRegionName, false); @@ -1246,8 +1248,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // Close the region without updating zk state. ProtobufUtil.closeRegion(admin, hri.getRegionName(), false); } @@ -1257,8 +1258,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public List getOnlineRegions( final ServerName sn) throws IOException { - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); return ProtobufUtil.getOnlineRegions(admin); } @@ -1320,8 +1320,7 @@ public class HBaseAdmin implements Abortable, Closeable { private void flush(final ServerName sn, final HRegionInfo hri) throws IOException { - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hri.getRegionName()); try { @@ -1490,8 +1489,7 @@ public class HBaseAdmin implements Abortable, Closeable { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); try { @@ -1518,10 +1516,11 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void move(final byte [] encodedRegionName, final byte [] destServerName) throws HBaseIOException, MasterNotRunningException, ZooKeeperConnectionException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { - MoveRegionRequest request = RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(null,request); + MoveRegionRequest request = + RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); + stub.moveRegion(null,request); } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); if (ioe instanceof HBaseIOException) { @@ -1530,9 +1529,8 @@ public class HBaseAdmin implements Abortable, Closeable { LOG.error("Unexpected exception: " + se + " from calling HMaster.moveRegion"); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); - } - finally { - master.close(); + } finally { + stub.close(); } } @@ -1587,7 +1585,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void offline(final byte [] regionName) throws IOException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdminService(); try { master.offlineRegion(null,RequestConverter.buildOfflineRegionRequest(regionName)); } catch (ServiceException se) { @@ -1605,11 +1603,11 @@ public class HBaseAdmin implements Abortable, Closeable { */ public boolean setBalancerRunning(final boolean on, final boolean synchronous) throws MasterNotRunningException, ZooKeeperConnectionException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(null, req).getPrevBalanceValue(); + return stub.setBalancerRunning(null, req).getPrevBalanceValue(); } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); if (ioe instanceof MasterNotRunningException) { @@ -1623,7 +1621,7 @@ public class HBaseAdmin implements Abortable, Closeable { // break interface by adding additional exception type. throw new MasterNotRunningException("Unexpected exception when calling balanceSwitch",se); } finally { - master.close(); + stub.close(); } } @@ -1635,11 +1633,11 @@ public class HBaseAdmin implements Abortable, Closeable { */ public boolean balancer() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { - return master.balance(null,RequestConverter.buildBalanceRequest()).getBalancerRan(); + return stub.balance(null,RequestConverter.buildBalanceRequest()).getBalancerRan(); } finally { - master.close(); + stub.close(); } } @@ -1652,12 +1650,12 @@ public class HBaseAdmin implements Abortable, Closeable { */ public boolean enableCatalogJanitor(boolean enable) throws ServiceException, MasterNotRunningException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { - return master.enableCatalogJanitor(null, + return stub.enableCatalogJanitor(null, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } finally { - master.close(); + stub.close(); } } @@ -1668,12 +1666,12 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws MasterNotRunningException */ public int runCatalogScan() throws ServiceException, MasterNotRunningException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { - return master.runCatalogScan(null, + return stub.runCatalogScan(null, RequestConverter.buildCatalogScanRequest()).getScanResult(); } finally { - master.close(); + stub.close(); } } @@ -1683,12 +1681,12 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws org.apache.hadoop.hbase.exceptions.MasterNotRunningException */ public boolean isCatalogJanitorEnabled() throws ServiceException, MasterNotRunningException { - MasterAdminKeepAliveConnection master = connection.getKeepAliveMasterAdmin(); + MasterAdminKeepAliveConnection stub = connection.getKeepAliveMasterAdminService(); try { - return master.isCatalogJanitorEnabled(null, + return stub.isCatalogJanitorEnabled(null, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } finally { - master.close(); + stub.close(); } } @@ -1704,7 +1702,7 @@ public class HBaseAdmin implements Abortable, Closeable { final byte[] encodedNameOfRegionB, final boolean forcible) throws IOException { MasterAdminKeepAliveConnection master = connection - .getKeepAliveMasterAdmin(); + .getKeepAliveMasterAdminService(); try { DispatchMergingRegionsRequest request = RequestConverter .buildDispatchMergingRegionsRequest(encodedNameOfRegionA, @@ -1800,8 +1798,7 @@ public class HBaseAdmin implements Abortable, Closeable { private void split(final ServerName sn, final HRegionInfo hri, byte[] splitPoint) throws IOException { - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); ProtobufUtil.split(admin, hri, splitPoint); } @@ -1924,7 +1921,7 @@ public class HBaseAdmin implements Abortable, Closeable { throws IOException { String hostname = Addressing.parseHostname(hostnamePort); int port = Addressing.parsePort(hostnamePort); - AdminProtocol admin = + AdminService.BlockingInterface admin = this.connection.getAdmin(new ServerName(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); @@ -2067,7 +2064,7 @@ public class HBaseAdmin implements Abortable, Closeable { public synchronized byte[][] rollHLogWriter(String serverName) throws IOException, FailedLogCloseException { ServerName sn = new ServerName(serverName); - AdminProtocol admin = this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); try { RollWALWriterResponse response = admin.rollWALWriter(null, request); @@ -2127,8 +2124,7 @@ public class HBaseAdmin implements Abortable, Closeable { throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName)); } else { ServerName sn = regionServerPair.getSecond(); - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); GetRegionInfoResponse response = admin.getRegionInfo(null, request); @@ -2143,8 +2139,7 @@ public class HBaseAdmin implements Abortable, Closeable { if (pair.getSecond() == null) continue; try { ServerName sn = pair.getSecond(); - AdminProtocol admin = - this.connection.getAdmin(sn); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( pair.getFirst().getRegionName(), true); GetRegionInfoResponse response = admin.getRegionInfo(null, request); @@ -2607,7 +2602,7 @@ public class HBaseAdmin implements Abortable, Closeable { * Create a {@link MasterAdminCallable} to use it. */ private V execute(MasterAdminCallable function) throws IOException { - function.masterAdmin = connection.getKeepAliveMasterAdmin(); + function.masterAdmin = connection.getKeepAliveMasterAdminService(); try { return executeCallable(function); } finally { @@ -2621,7 +2616,7 @@ public class HBaseAdmin implements Abortable, Closeable { * Create a {@link MasterAdminCallable} to use it. */ private V execute(MasterMonitorCallable function) throws IOException { - function.masterMonitor = connection.getKeepAliveMasterMonitor(); + function.masterMonitor = connection.getKeepAliveMasterMonitorService(); try { return executeCallable(function); } finally { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectable.java new file mode 100644 index 0000000..21485d3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectable.java @@ -0,0 +1,48 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * This class makes it convenient for one to execute a command in the context + * of a {@link HConnection} instance based on the given {@link Configuration}. + * + *

+ * If you find yourself wanting to use a {@link HConnection} for a relatively + * short duration of time, and do not want to deal with the hassle of creating + * and cleaning up that resource, then you should consider using this + * convenience class. + * + * @param + * the return type of the {@link HConnectable#connect(HConnection)} + * method. + */ +public abstract class HConnectable { + public Configuration conf; + + protected HConnectable(Configuration conf) { + this.conf = conf; + } + + public abstract T connect(HConnection connection) throws IOException; +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 9c0a0f4..2ab9897 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -18,40 +18,43 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterAdminProtocol; -import org.apache.hadoop.hbase.MasterMonitorProtocol; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - /** - * Cluster connection. Hosts a connection to the ZooKeeper ensemble and - * thereafter into the HBase cluster. Knows how to locate regions out on the cluster, + * A cluster connection. Knows how to find the master, locate regions out on the cluster, * keeps a cache of locations and then knows how to recalibrate after they move. - * {@link HConnectionManager} manages instances of this class. + * {@link HConnectionManager} manages instances of this class. This is NOT a connection to a + * particular server but to all servers in the cluster. An implementation takes care of individual + * connections at a lower level. * *

HConnections are used by {@link HTable} mostly but also by * {@link HBaseAdmin}, {@link CatalogTracker}, * and {@link ZooKeeperWatcher}. HConnection instances can be shared. Sharing * is usually what you want because rather than each HConnection instance * having to do its own discovery of regions out on the cluster, instead, all - * clients get to share the one cache of locations. Sharing makes cleanup of - * HConnections awkward. See {@link HConnectionManager} for cleanup - * discussion. + * clients get to share the one cache of locations. {@link HConnectionManager} does the + * sharing for you if you go by it getting connections. Sharing makes cleanup of + * HConnections awkward. See {@link HConnectionManager} for cleanup discussion. * * @see HConnectionManager */ @@ -213,29 +216,14 @@ public interface HConnection extends Abortable, Closeable { final boolean offlined) throws IOException; /** - * Returns a {@link MasterAdminProtocol} to the active master + * Returns a {@link MasterAdminKeepAliveConnection} to the active master */ - public MasterAdminProtocol getMasterAdmin() throws IOException; + public MasterAdminService.BlockingInterface getMasterAdmin() throws IOException; /** - * Returns an {@link MasterMonitorProtocol} to the active master + * Returns an {@link MasterMonitorKeepAliveConnection} to the active master */ - public MasterMonitorProtocol getMasterMonitor() throws IOException; - - - /** - * Establishes a connection to the region server at the specified address. - * @param hostname RegionServer hostname - * @param port RegionServer port - * @return proxy for HRegionServer - * @throws IOException if a remote or network exception occurs - * @deprecated - use @link {#getAdmin(final ServerName serverName)} which takes into account - * the startCode - */ - @Deprecated - public AdminProtocol getAdmin(final String hostname, final int port) - throws IOException; - + public MasterMonitorService.BlockingInterface getMasterMonitor() throws IOException; /** * Establishes a connection to the region server at the specified address. @@ -243,27 +231,10 @@ public interface HConnection extends Abortable, Closeable { * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs */ - public AdminProtocol getAdmin(final ServerName serverName) - throws IOException; - - /** - * Establishes a connection to the region server at the specified address, and return - * a region client protocol. - * - * @param hostname RegionServer hostname - * @param port RegionServer port - * @return ClientProtocol proxy for RegionServer - * @throws IOException if a remote or network exception occurs - * @deprecated - use @link {#getClient(final ServerName serverName)} which takes into account - * the startCode - */ - @Deprecated - public ClientProtocol getClient(final String hostname, final int port) - throws IOException; - + public AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException; /** - * Establishes a connection to the region server at the specified address, and return + * Establishes a connection to the region server at the specified address, and returns * a region client protocol. * * @param serverName @@ -271,30 +242,17 @@ public interface HConnection extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs * */ - public ClientProtocol getClient(final ServerName serverName) throws IOException; - - /** - * Establishes a connection to the region server at the specified address. - * @param hostname RegionServer hostname - * @param port RegionServer port - * @param getMaster - do we check if master is alive - * @return proxy for HRegionServer - * @throws IOException if a remote or network exception occurs - * @deprecated use @link {#getAdmin(final ServerName serverName, boolean getMaster)} - * which takes into account the startCode. - */ - @Deprecated - public AdminProtocol getAdmin(final String hostname, final int port, boolean getMaster) - throws IOException; + public ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** * Establishes a connection to the region server at the specified address. * @param serverName - * @param getMaster - do we check if master is alive + * @param getMaster do we check if master is alive * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs + * @deprecated You can pass master flag but nothing special is done. */ - public AdminProtocol getAdmin(final ServerName serverName, boolean getMaster) + public AdminService.BlockingInterface getAdmin(final ServerName serverName, boolean getMaster) throws IOException; /** @@ -417,13 +375,14 @@ public interface HConnection extends Abortable, Closeable { public void clearCaches(final ServerName sn); /** - * This function allows HBaseAdminProtocol and potentially others to get a shared MasterMonitor + * This function allows HBaseAdmin and potentially others to get a shared MasterMonitor * connection. * @return The shared instance. Never returns null. * @throws MasterNotRunningException */ - public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor() - throws MasterNotRunningException; + // TODO: Why is this in the public interface when the returned type is shutdown package access? + public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService() + throws MasterNotRunningException; /** * This function allows HBaseAdmin and potentially others to get a shared MasterAdminProtocol @@ -431,7 +390,8 @@ public interface HConnection extends Abortable, Closeable { * @return The shared instance. Never returns null. * @throws MasterNotRunningException */ - public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin() throws MasterNotRunningException; + // TODO: Why is this in the public interface when the returned type is shutdown package access? + public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService() throws MasterNotRunningException; /** * @param serverName @@ -439,4 +399,3 @@ public interface HConnection extends Abortable, Closeable { */ public boolean isDeadServer(ServerName serverName); } - diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java new file mode 100644 index 0000000..ac6914e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java @@ -0,0 +1,140 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; + +/** + * Denotes a unique key to an {@link HConnection} instance. + * + * In essence, this class captures the properties in {@link Configuration} + * that may be used in the process of establishing a connection. In light of + * that, if any new such properties are introduced into the mix, they must be + * added to the {@link HConnectionKey#properties} list. + * + */ +class HConnectionKey { + final static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID }; + + private Map properties; + private String username; + + HConnectionKey(Configuration conf) { + Map m = new HashMap(); + if (conf != null) { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + m.put(property, value); + } + } + } + this.properties = Collections.unmodifiableMap(m); + + try { + User currentUser = User.getCurrent(); + if (currentUser != null) { + username = currentUser.getName(); + } + } catch (IOException ioe) { + HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + if (username != null) { + result = username.hashCode(); + } + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + + @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ", + justification="Optimization") + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.username != null && !this.username.equals(that.username)) { + return false; + } else if (this.username == null && that.username != null) { + return false; + } + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + //noinspection StringEquality + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } + + @Override + public String toString() { + return "HConnectionKey{" + + "properties=" + properties + + ", username='" + username + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 519d900..a54160e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -20,15 +20,9 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetSocketAddress; import java.net.SocketException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -53,18 +47,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AdminServiceSecurityInfo; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ClientServiceSecurityInfo; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MasterAdminProtocol; -import org.apache.hadoop.hbase.MasterMonitorProtocol; -import org.apache.hadoop.hbase.MasterProtocol; -import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.MasterAdminServiceSecurityInfo; +import org.apache.hadoop.hbase.MasterMonitorServiceSecurityInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; @@ -77,26 +70,95 @@ import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException; import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.ipc.HBaseClientRPC; -import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine; -import org.apache.hadoop.hbase.ipc.RpcClientEngine; +import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Triple; -import org.apache.hadoop.hbase.zookeeper.*; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; +import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** @@ -105,7 +167,8 @@ import com.google.protobuf.ServiceException; * {@link Configuration}; all invocations of {@link #getConnection(Configuration)} * that pass the same {@link Configuration} instance will be returned the same * {@link HConnection} instance (Adding properties to a Configuration - * instance does not change its object identity). Sharing {@link HConnection} + * instance does not change its object identity; for more on how this is done see + * {@link HConnectionKey}). Sharing {@link HConnection} * instances is usually what you want; all clients of the {@link HConnection} * instances share the HConnections' cache of Region locations rather than each * having to discover for itself the location of meta, etc. It makes @@ -116,11 +179,9 @@ import com.google.protobuf.ServiceException; * implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a * connection per cluster-member, exclusively). * - *

But sharing connections - * makes clean up of {@link HConnection} instances a little awkward. Currently, - * clients cleanup by calling - * {@link #deleteConnection(Configuration)}. This will shutdown the - * zookeeper connection the HConnection was using and clean up all + *

But sharing connections makes clean up of {@link HConnection} instances a little awkward. + * Currently, clients cleanup by calling {@link #deleteConnection(Configuration)}. This will + * shutdown the zookeeper connection the HConnection was using and clean up all * HConnection resources as well as stopping proxies to servers out on the * cluster. Not running the cleanup will not end the world; it'll * just stall the closeup some and spew some zookeeper connection failed @@ -150,43 +211,30 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Public @InterfaceStability.Evolving public class HConnectionManager { + static final Log LOG = LogFactory.getLog(HConnectionManager.class); + + public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; + // An LRU Map of HConnectionKey -> HConnection (TableServer). All // access must be synchronized. This map is not private because tests // need to be able to tinker with it. - static final Map HBASE_INSTANCES; - - public static final int MAX_CACHED_HBASE_INSTANCES; - - /** Parameter name for what client protocol to use. */ - public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class"; - - /** Default client protocol class name. */ - public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName(); - - /** Parameter name for what admin protocol to use. */ - public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class"; - - /** Default admin protocol class name. */ - public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName(); + static final Map CONNECTION_INSTANCES; - public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server"; - - private static final Log LOG = LogFactory.getLog(HConnectionManager.class); + public static final int MAX_CACHED_CONNECTION_INSTANCES; static { // We set instances to one more than the value specified for {@link // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max // connections to the ensemble from the one client is 30, so in that case we // should run into zk issues before the LRU hit this value of 31. - MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt( - HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, - HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1; - HBASE_INSTANCES = new LinkedHashMap( - (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) { - @Override + MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt( + HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1; + CONNECTION_INSTANCES = new LinkedHashMap( + (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) { + @Override protected boolean removeEldestEntry( Map.Entry eldest) { - return size() > MAX_CACHED_HBASE_INSTANCES; + return size() > MAX_CACHED_CONNECTION_INSTANCES; } }; } @@ -194,31 +242,31 @@ public class HConnectionManager { /* * Non-instantiable. */ - protected HConnectionManager() { + private HConnectionManager() { super(); } /** - * Get the connection that goes with the passed conf - * configuration instance. - * If no current connection exists, method creates a new connection for the - * passed conf instance. + * Get the connection that goes with the passed conf configuration instance. + * If no current connection exists, method creates a new connection and keys it using + * connection-specific properties from the passed {@link Configuration}; see + * {@link HConnectionKey}. * @param conf configuration * @return HConnection object for conf * @throws ZooKeeperConnectionException */ - public static HConnection getConnection(Configuration conf) + public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); - synchronized (HBASE_INSTANCES) { - HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); + synchronized (CONNECTION_INSTANCES) { + HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf, true); - HBASE_INSTANCES.put(connectionKey, connection); + CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = new HConnectionImplementation(conf, true); - HBASE_INSTANCES.put(connectionKey, connection); + CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; @@ -226,11 +274,10 @@ public class HConnectionManager { } /** - * Create a new HConnection instance using the passed conf - * instance. - * Note: This bypasses the usual HConnection life cycle management! - * Use this with caution, the caller is responsible for closing the - * created connection. + * Create a new HConnection instance using the passed conf instance. + *

Note: This bypasses the usual HConnection life cycle management done by + * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for + * calling {@link HConnection#close()} on the returned connection instance. * @param conf configuration * @return HConnection object for conf * @throws ZooKeeperConnectionException @@ -241,22 +288,19 @@ public class HConnectionManager { } /** - * Delete connection information for the instance specified by configuration. - * If there are no more references to it, this will then close connection to - * the zookeeper ensemble and let go of all resources. + * Delete connection information for the instance specified by passed configuration. + * If there are no more references to the designated connection connection, this method will + * then close connection to the zookeeper ensemble and let go of all associated resources. * - * @param conf - * configuration whose identity is used to find {@link HConnection} - * instance. + * @param conf configuration whose identity is used to find {@link HConnection} instance. */ public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); } /** - * Delete stale connection information for the instance specified by configuration. - * This will then close connection to - * the zookeeper ensemble and let go of all resources. + * Cleanup a known stale connection. + * This will then close connection to the zookeeper ensemble and let go of all resources. * * @param connection */ @@ -268,22 +312,21 @@ public class HConnectionManager { * Delete information for all connections. */ public static void deleteAllConnections() { - synchronized (HBASE_INSTANCES) { + synchronized (CONNECTION_INSTANCES) { Set connectionKeys = new HashSet(); - connectionKeys.addAll(HBASE_INSTANCES.keySet()); + connectionKeys.addAll(CONNECTION_INSTANCES.keySet()); for (HConnectionKey connectionKey : connectionKeys) { deleteConnection(connectionKey, false); } - HBASE_INSTANCES.clear(); + CONNECTION_INSTANCES.clear(); } } private static void deleteConnection(HConnection connection, boolean staleConnection) { - synchronized (HBASE_INSTANCES) { - for (Entry connectionEntry : HBASE_INSTANCES - .entrySet()) { - if (connectionEntry.getValue() == connection) { - deleteConnection(connectionEntry.getKey(), staleConnection); + synchronized (CONNECTION_INSTANCES) { + for (Entry e: CONNECTION_INSTANCES.entrySet()) { + if (e.getValue() == connection) { + deleteConnection(e.getKey(), staleConnection); break; } } @@ -291,18 +334,17 @@ public class HConnectionManager { } private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) { - synchronized (HBASE_INSTANCES) { - HConnectionImplementation connection = HBASE_INSTANCES - .get(connectionKey); + synchronized (CONNECTION_INSTANCES) { + HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection != null) { connection.decCount(); if (connection.isZeroReference() || staleConnection) { - HBASE_INSTANCES.remove(connectionKey); + CONNECTION_INSTANCES.remove(connectionKey); connection.internalClose(); } } else { LOG.error("Connection not found in the list, can't delete it "+ - "(connection key="+connectionKey+"). May be the key was modified?"); + "(connection key=" + connectionKey + "). May be the key was modified?"); } } } @@ -313,14 +355,12 @@ public class HConnectionManager { * @return Number of cached regions for the table. * @throws ZooKeeperConnectionException */ - static int getCachedRegionCount(Configuration conf, - final byte[] tableName) + static int getCachedRegionCount(Configuration conf, final byte[] tableName) throws IOException { return execute(new HConnectable(conf) { @Override public Integer connect(HConnection connection) { - return ((HConnectionImplementation) connection) - .getNumberOfCachedRegionLocations(tableName); + return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName); } }); } @@ -331,8 +371,8 @@ public class HConnectionManager { * @return true if the region where the table and row reside is cached. * @throws ZooKeeperConnectionException */ - static boolean isRegionCached(Configuration conf, - final byte[] tableName, final byte[] row) throws IOException { + static boolean isRegionCached(Configuration conf, final byte[] tableName, final byte[] row) + throws IOException { return execute(new HConnectable(conf) { @Override public Boolean connect(HConnection connection) { @@ -342,33 +382,9 @@ public class HConnectionManager { } /** - * This class makes it convenient for one to execute a command in the context - * of a {@link HConnection} instance based on the given {@link Configuration}. - * - *

- * If you find yourself wanting to use a {@link HConnection} for a relatively - * short duration of time, and do not want to deal with the hassle of creating - * and cleaning up that resource, then you should consider using this - * convenience class. - * - * @param - * the return type of the {@link HConnectable#connect(HConnection)} - * method. - */ - public static abstract class HConnectable { - public Configuration conf; - - protected HConnectable(Configuration conf) { - this.conf = conf; - } - - public abstract T connect(HConnection connection) throws IOException; - } - - /** * This convenience method invokes the given {@link HConnectable#connect} * implementation using a {@link HConnection} instance that lasts just for the - * duration of that invocation. + * duration of the invocation. * * @param the return type of the connect method * @param connectable the {@link HConnectable} instance @@ -398,127 +414,14 @@ public class HConnectionManager { } } - /** - * Denotes a unique key to a {@link HConnection} instance. - * - * In essence, this class captures the properties in {@link Configuration} - * that may be used in the process of establishing a connection. In light of - * that, if any new such properties are introduced into the mix, they must be - * added to the {@link HConnectionKey#properties} list. - * - */ - public static class HConnectionKey { - final static String[] CONNECTION_PROPERTIES = new String[] { - HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.ZOOKEEPER_CLIENT_PORT, - HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, - HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.HBASE_CLIENT_PREFETCH_LIMIT, - HConstants.HBASE_META_SCANNER_CACHING, - HConstants.HBASE_CLIENT_INSTANCE_ID }; - - private Map properties; - private String username; - - public HConnectionKey(Configuration conf) { - Map m = new HashMap(); - if (conf != null) { - for (String property : CONNECTION_PROPERTIES) { - String value = conf.get(property); - if (value != null) { - m.put(property, value); - } - } - } - this.properties = Collections.unmodifiableMap(m); - - try { - User currentUser = User.getCurrent(); - if (currentUser != null) { - username = currentUser.getName(); - } - } catch (IOException ioe) { - LOG.warn("Error obtaining current user, skipping username in HConnectionKey", - ioe); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - if (username != null) { - result = username.hashCode(); - } - for (String property : CONNECTION_PROPERTIES) { - String value = properties.get(property); - if (value != null) { - result = prime * result + value.hashCode(); - } - } - - return result; - } - - - @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ", - justification="Optimization") - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HConnectionKey that = (HConnectionKey) obj; - if (this.username != null && !this.username.equals(that.username)) { - return false; - } else if (this.username == null && that.username != null) { - return false; - } - if (this.properties == null) { - if (that.properties != null) { - return false; - } - } else { - if (that.properties == null) { - return false; - } - for (String property : CONNECTION_PROPERTIES) { - String thisValue = this.properties.get(property); - String thatValue = that.properties.get(property); - //noinspection StringEquality - if (thisValue == thatValue) { - continue; - } - if (thisValue == null || !thisValue.equals(thatValue)) { - return false; - } - } - } - return true; - } - - @Override - public String toString() { - return "HConnectionKey{" + - "properties=" + properties + - ", username='" + username + '\'' + - '}'; - } - } - /** Encapsulates connection to zookeeper and regionservers.*/ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", + justification="Access to the conncurrent hash map is under a lock so should be fine.") static class HConnectionImplementation implements HConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); - private final Class adminClass; - private final Class clientClass; private final long pause; private final int numTries; - private final int maxRPCAttempts; private final int rpcTimeout; private final int prefetchRegionLimit; private final boolean useServerTrackerForRetries; @@ -546,22 +449,15 @@ public class HConnectionManager { private final Configuration conf; - // client RPC - private RpcClientEngine rpcEngine; - - // Known region ServerName.toString() -> RegionClient/Admin - private final ConcurrentHashMap> servers = - new ConcurrentHashMap>(); - private final ConcurrentHashMap connectionLock = - new ConcurrentHashMap(); + // Client rpc instance. + private RpcClient rpcClient; /** * Map of table to table {@link HRegionLocation}s. The table key is made * by doing a {@link Bytes#mapKey(byte[])} of the table's name. */ - private final Map> - cachedRegionLocations = - new HashMap>(); + private final Map> cachedRegionLocations = + new HashMap>(); // The presence of a server in the map implies it's likely that there is an // entry in cachedRegionLocations that map to this server; but the absence @@ -579,47 +475,33 @@ public class HConnectionManager { // indicates whether this connection's life cycle is managed (by us) private final boolean managed; + /** * constructor * @param conf Configuration object + * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection + * to zk and shutdown of all services; we just close down the resources this connection was + * responsible for and decrement usage counters. It is up to the caller to do the full + * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection, + * and cached region locations, established regionserver connections, etc. When connections + * are shared, we have reference counting going on and will only do full cleanup when no more + * users of an HConnectionImplementation instance. */ - @SuppressWarnings("unchecked") - public HConnectionImplementation(Configuration conf, boolean managed) throws IOException { + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { this.conf = conf; this.managed = managed; - String adminClassName = conf.get(REGION_PROTOCOL_CLASS, - DEFAULT_ADMIN_PROTOCOL_CLASS); this.closed = false; - try { - this.adminClass = - (Class) Class.forName(adminClassName); - } catch (ClassNotFoundException e) { - throw new UnsupportedOperationException( - "Unable to find region server interface " + adminClassName, e); - } - String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS, - DEFAULT_CLIENT_PROTOCOL_CLASS); - try { - this.clientClass = - (Class) Class.forName(clientClassName); - } catch (ClassNotFoundException e) { - throw new UnsupportedOperationException( - "Unable to find client protocol " + clientClassName, e); - } this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.maxRPCAttempts = conf.getInt( - HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, - HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS); + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.rpcTimeout = conf.getInt( - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.prefetchRegionLimit = conf.getInt( - HConstants.HBASE_CLIENT_PREFETCH_LIMIT, - HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true); + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); + this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true); long serverTrackerTimeout = 0; if (this.useServerTrackerForRetries) { // Server tracker allows us to do faster, and yet useful (hopefully), retries. @@ -636,11 +518,7 @@ public class HConnectionManager { this.serverTrackerTimeout = serverTrackerTimeout; retrieveClusterId(); - // ProtobufRpcClientEngine is the main RpcClientEngine implementation, - // but we maintain access through an interface to allow overriding for tests - // RPC engine setup must follow obtaining the cluster ID for token authentication to work - this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId); - + this.rpcClient = new RpcClient(this.conf, this.clusterId); // Do we publish the status? Class listenerClass = @@ -654,14 +532,25 @@ public class HConnectionManager { @Override public void newDead(ServerName sn) { clearCaches(sn); - rpcEngine.getClient().cancelConnections(sn.getHostname(), sn.getPort(), - new SocketException(sn.getServerName() + " is dead: closing its connection.")); + rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), + new SocketException(sn.getServerName() + " is dead: closing its connection.")); } }, conf, listenerClass); } } /** + * For tests only. + * @param rpcClient Client we should use instead. + * @return Previous rpcClient + */ + RpcClient setRpcClient(final RpcClient rpcClient) { + RpcClient oldRpcClient = this.rpcClient; + this.rpcClient = rpcClient; + return oldRpcClient; + } + + /** * An identifier that will remain the same for a given connection. * @return */ @@ -706,125 +595,6 @@ public class HConnectionManager { return this.conf; } - private static class MasterProtocolState { - public MasterProtocol protocol; - public int userCount; - public long keepAliveUntil = Long.MAX_VALUE; - public final Class protocolClass; - - public MasterProtocolState ( - final Class protocolClass) { - this.protocolClass = protocolClass; - } - } - - /** - * Create a new Master proxy. Try once only. - */ - private MasterProtocol createMasterInterface( - MasterProtocolState masterProtocolState) - throws IOException, KeeperException, ServiceException { - - ZooKeeperKeepAliveConnection zkw; - try { - zkw = getKeepAliveZooKeeperWatcher(); - } catch (IOException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } - - try { - - checkIfBaseNodeAvailable(zkw); - ServerName sn = MasterAddressTracker.getMasterAddress(zkw); - if (sn == null) { - String msg = - "ZooKeeper available but no active master location found"; - LOG.info(msg); - throw new MasterNotRunningException(msg); - } - - - InetSocketAddress isa = - new InetSocketAddress(sn.getHostname(), sn.getPort()); - MasterProtocol tryMaster = rpcEngine.getProxy( - masterProtocolState.protocolClass, - isa, this.conf, this.rpcTimeout); - - if (tryMaster.isMasterRunning( - null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) { - return tryMaster; - } else { - String msg = "Can create a proxy to master, but it is not running"; - LOG.info(msg); - throw new MasterNotRunningException(msg); - } - } finally { - zkw.close(); - } - } - - /** - * Create a master, retries if necessary. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD") - private MasterProtocol createMasterWithRetries( - MasterProtocolState masterProtocolState) throws MasterNotRunningException { - - // The lock must be at the beginning to prevent multiple master creation - // (and leaks) in a multithread context - - synchronized (this.masterAndZKLock) { - Exception exceptionCaught = null; - MasterProtocol master = null; - int tries = 0; - while ( - !this.closed && master == null - ) { - tries++; - try { - master = createMasterInterface(masterProtocolState); - } catch (IOException e) { - exceptionCaught = e; - } catch (KeeperException e) { - exceptionCaught = e; - } catch (ServiceException e) { - exceptionCaught = e; - } - - if (exceptionCaught != null) - // It failed. If it's not the last try, we're going to wait a little - if (tries < numTries) { - // tries at this point is 1 or more; decrement to start from 0. - long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1); - LOG.info("getMaster attempt " + tries + " of " + numTries + - " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught); - - try { - Thread.sleep(pauseTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - "Thread was interrupted while trying to connect to master.", e); - } - - } else { - // Enough tries, we stop now - LOG.info("getMaster attempt " + tries + " of " + numTries + - " failed; no more retrying.", exceptionCaught); - throw new MasterNotRunningException(exceptionCaught); - } - } - - if (master == null) { - // implies this.closed true - throw new MasterNotRunningException( - "Connection was closed while trying to get master"); - } - - return master; - } - } - private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw) throws MasterNotRunningException { String errorMsg; @@ -851,11 +621,16 @@ public class HConnectionManager { */ @Override public boolean isMasterRunning() - throws MasterNotRunningException, ZooKeeperConnectionException { - // When getting the master proxy connection, we check it's running, + throws MasterNotRunningException, ZooKeeperConnectionException { + // When getting the master connection, we check it's running, // so if there is no exception, it means we've been able to get a // connection on a running master - getKeepAliveMasterMonitor().close(); + MasterMonitorKeepAliveConnection m = getKeepAliveMasterMonitorService(); + try { + m.close(); + } catch (IOException e) { + throw new MasterNotRunningException("Failed close", e); + } return true; } @@ -1142,7 +917,7 @@ public class HConnectionManager { metaLocation = locateRegion(parentTable, metaKey, true, false); // If null still, go around again. if (metaLocation == null) continue; - ClientProtocol server = getClient(metaLocation.getServerName()); + ClientService.BlockingInterface service = getClient(metaLocation.getServerName()); Result regionInfoRow; // This block guards against two threads trying to load the meta @@ -1170,7 +945,7 @@ public class HConnectionManager { } // Query the meta region for the location of the meta region - regionInfoRow = ProtobufUtil.getRowOrBefore(server, + regionInfoRow = ProtobufUtil.getRowOrBefore(service, metaLocation.getRegionInfo().getRegionName(), metaKey, HConstants.CATALOG_FAMILY); } @@ -1229,7 +1004,7 @@ public class HConnectionManager { throw e; } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = ((RemoteException)e).unwrapRemoteException(); } if (tries < numTries - 1) { if (LOG.isDebugEnabled()) { @@ -1443,103 +1218,339 @@ public class HConnectionManager { } } - @Override - @Deprecated - public AdminProtocol getAdmin(final String hostname, final int port) throws IOException { - return getAdmin(new ServerName(hostname, port, 0L)); + // Map keyed by service name + regionserver to service stub implementation + private final ConcurrentHashMap stubs = + new ConcurrentHashMap(); + // Map of locks used creating service stubs per regionserver. + private final ConcurrentHashMap connectionLock = + new ConcurrentHashMap(); + + /** + * Maintains current state of MasterService instance. + */ + static abstract class MasterServiceState { + HConnection connection; + int userCount; + long keepAliveUntil = Long.MAX_VALUE; + + MasterServiceState (final HConnection connection) { + super(); + this.connection = connection; + } + + abstract Object getStub(); + abstract void clearStub(); + abstract boolean isMasterRunning() throws ServiceException; } - @Override - public AdminProtocol getAdmin(final ServerName serverName) - throws IOException { - return getAdmin(serverName, false); + /** + * State of the MasterAdminService connection/setup. + */ + static class MasterAdminServiceState extends MasterServiceState { + MasterAdminService.BlockingInterface stub; + MasterAdminServiceState(final HConnection connection) { + super(connection); + } + + @Override + public String toString() { + return "MasterAdminService"; + } + + @Override + Object getStub() { + return this.stub; + } + + @Override + void clearStub() { + this.stub = null; + } + + @Override + boolean isMasterRunning() throws ServiceException { + MasterProtos.IsMasterRunningResponse response = + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); + return response != null? response.getIsMasterRunning(): false; + } } - @Override - @Deprecated - public ClientProtocol getClient(final String hostname, final int port) - throws IOException { - return (ClientProtocol)getProtocol(hostname, port, clientClass); + /** + * State of the MasterMonitorService connection/setup. + */ + static class MasterMonitorServiceState extends MasterServiceState { + MasterMonitorService.BlockingInterface stub; + MasterMonitorServiceState(final HConnection connection) { + super(connection); + } + + @Override + public String toString() { + return "MasterMonitorService"; + } + + @Override + Object getStub() { + return this.stub; + } + + @Override + void clearStub() { + this.stub = null; + } + + @Override + boolean isMasterRunning() throws ServiceException { + MasterProtos.IsMasterRunningResponse response = + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); + return response != null? response.getIsMasterRunning(): false; + } } - @Override - public ClientProtocol getClient(final ServerName serverName) - throws IOException { - if (isDeadServer(serverName)){ - throw new RegionServerStoppedException("The server " + serverName + " is dead."); + /** + * Makes a client-side stub for master services. Sub-class to specialize. + * Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code + * when setting up the MasterMonitorService and MasterAdminService. + */ + abstract class StubMaker { + /** + * @return Interface that is carrying security info needed by this stuff. + */ + protected abstract Class getSecurityInfo(); + + /** + * Make stub and cache it internal so can be used later doing the isMasterRunning call. + * @param channel + */ + protected abstract Object makeStub(final BlockingRpcChannel channel); + + /** + * Once setup, check it works by doing isMasterRunning check. + * @throws ServiceException + */ + protected abstract void isMasterRunning() throws ServiceException; + + /** + * Create a stub. Try once only. It is not typed because there is no common type to + * protobuf services nor their interfaces. Let the caller do appropriate casting. + * @param intf Which kind of Master Service we want, an admin or a monitoring one (we have + * two at least currently) + * @param stubMaker An instance of a BlockingService#newBlockingStub method + * @return A stub for master services. + * @throws IOException + * @throws KeeperException + * @throws ServiceException + */ + private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { + ZooKeeperKeepAliveConnection zkw; + try { + zkw = getKeepAliveZooKeeperWatcher(); + } catch (IOException e) { + throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); + } + try { + checkIfBaseNodeAvailable(zkw); + ServerName sn = MasterAddressTracker.getMasterAddress(zkw); + if (sn == null) { + String msg = "ZooKeeper available but no active master location found"; + LOG.info(msg); + throw new MasterNotRunningException(msg); + } + if (isDeadServer(sn)) { + throw new MasterNotRunningException(sn + " is dead."); + } + // Use the security info interface name as our stub key + String key = getStubKey(getSecurityInfo().getName(), sn.getHostAndPort()); + connectionLock.putIfAbsent(key, key); + Object stub = null; + synchronized (connectionLock.get(key)) { + stub = stubs.get(key); + if (stub == null) { + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, getSecurityInfo(), + User.getCurrent(), rpcTimeout); + stub = makeStub(channel); + isMasterRunning(); + stubs.put(key, stub); + } + } + return stub; + } finally { + zkw.close(); + } + } + + /** + * Create a stub against the master. Retry if necessary. + * @param intf Which interface/service we want to setup. + * @param stubMaker An instance of a BlockingService#newBlockingStub method + * @return A stub to do intf against the master + * @throws MasterNotRunningException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD") + Object makeStub() throws MasterNotRunningException { + // The lock must be at the beginning to prevent multiple master creations + // (and leaks) in a multithread context + synchronized (masterAndZKLock) { + Exception exceptionCaught = null; + Object stub = null; + int tries = 0; + while (!closed && stub == null) { + tries++; + try { + stub = makeStubNoRetries(); + } catch (IOException e) { + exceptionCaught = e; + } catch (KeeperException e) { + exceptionCaught = e; + } catch (ServiceException e) { + exceptionCaught = e; + } + + if (exceptionCaught != null) + // It failed. If it's not the last try, we're going to wait a little + if (tries < numTries) { + // tries at this point is 1 or more; decrement to start from 0. + long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1); + LOG.info("getMaster attempt " + tries + " of " + numTries + + " failed; retrying after sleep of " +pauseTime + ", exception=" + + exceptionCaught); + + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Thread was interrupted while trying to connect to master.", e); + } + } else { + // Enough tries, we stop now + LOG.info("getMaster attempt " + tries + " of " + numTries + + " failed; no more retrying.", exceptionCaught); + throw new MasterNotRunningException(exceptionCaught); + } + } + + if (stub == null) { + // implies this.closed true + throw new MasterNotRunningException("Connection was closed while trying to get master"); + } + return stub; + } } - return (ClientProtocol) - getProtocol(serverName.getHostname(), serverName.getPort(), clientClass); } - @Override - @Deprecated - public AdminProtocol getAdmin(final String hostname, final int port, - final boolean master) - throws IOException { - return (AdminProtocol)getProtocol(hostname, port, adminClass); + /** + * Class to make a MasterMonitorService stub. + */ + class MasterMonitorServiceStubMaker extends StubMaker { + private MasterMonitorService.BlockingInterface stub; + @Override + protected Class getSecurityInfo() { + return MasterMonitorServiceSecurityInfo.class; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD") + MasterMonitorService.BlockingInterface makeStub() throws MasterNotRunningException { + return (MasterMonitorService.BlockingInterface)super.makeStub(); + } + + @Override + protected Object makeStub(BlockingRpcChannel channel) { + this.stub = MasterMonitorService.newBlockingStub(channel); + return this.stub; + } + + @Override + protected void isMasterRunning() throws ServiceException { + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); + } } + /** + * Class to make a MasterAdminService stub. + */ + class MasterAdminServiceStubMaker extends StubMaker { + private MasterAdminService.BlockingInterface stub; + + @Override + protected Class getSecurityInfo() { + return MasterAdminServiceSecurityInfo.class; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD") + MasterAdminService.BlockingInterface makeStub() throws MasterNotRunningException { + return (MasterAdminService.BlockingInterface)super.makeStub(); + } + + @Override + protected Object makeStub(BlockingRpcChannel channel) { + this.stub = MasterAdminService.newBlockingStub(channel); + return this.stub; + } + + @Override + protected void isMasterRunning() throws ServiceException { + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); + } + }; + @Override - public AdminProtocol getAdmin(final ServerName serverName, final boolean master) + public AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException { - if (isDeadServer(serverName)){ - throw new RegionServerStoppedException("The server " + serverName + " is dead."); + return getAdmin(serverName, false); + } + + @Override + // Nothing is done w/ the 'master' parameter. It is ignored. + public AdminService.BlockingInterface getAdmin(final ServerName serverName, + final boolean master) + throws IOException { + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException(serverName + " is dead."); + } + String key = getStubKey(AdminService.BlockingInterface.class.getName(), + serverName.getHostAndPort()); + this.connectionLock.putIfAbsent(key, key); + AdminService.BlockingInterface stub = null; + synchronized (this.connectionLock.get(key)) { + stub = (AdminService.BlockingInterface)this.stubs.get(key); + if (stub == null) { + BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, + AdminServiceSecurityInfo.class, User.getCurrent(), this.rpcTimeout); + stub = (AdminService.BlockingInterface)AdminService.newBlockingStub(channel); + this.stubs.put(key, stub); + } } - return (AdminProtocol)getProtocol( - serverName.getHostname(), serverName.getPort(), adminClass); + return stub; } - /** - * Either the passed isa is null or hostname - * can be but not both. - * @param hostname - * @param port - * @param protocolClass - * @return Proxy. - * @throws IOException - */ - IpcProtocol getProtocol(final String hostname, - final int port, final Class protocolClass) + @Override + public ClientService.BlockingInterface getClient(final ServerName sn) throws IOException { - String rsName = Addressing.createHostAndPortStr(hostname, port); - // See if we already have a connection (common case) - Map protocols = this.servers.get(rsName); - if (protocols == null) { - protocols = new HashMap(); - Map existingProtocols = - this.servers.putIfAbsent(rsName, protocols); - if (existingProtocols != null) { - protocols = existingProtocols; - } - } - String protocol = protocolClass.getName(); - IpcProtocol server = protocols.get(protocol); - if (server == null) { - // create a unique lock for this RS + protocol (if necessary) - String lockKey = protocol + "@" + rsName; - this.connectionLock.putIfAbsent(lockKey, lockKey); - // get the RS lock - synchronized (this.connectionLock.get(lockKey)) { - // do one more lookup in case we were stalled above - server = protocols.get(protocol); - if (server == null) { - try { - // Only create isa when we need to. - InetSocketAddress address = new InetSocketAddress(hostname, port); - // definitely a cache miss. establish an RPC for this RS - server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf, - this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); - protocols.put(protocol, server); - } catch (RemoteException e) { - LOG.warn("RemoteException connecting to RS", e); - // Throw what the RemoteException was carrying. - throw e.unwrapRemoteException(); - } - } + if (isDeadServer(sn)) { + throw new RegionServerStoppedException(sn + " is dead."); + } + String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort()); + this.connectionLock.putIfAbsent(key, key); + ClientService.BlockingInterface stub = null; + synchronized (this.connectionLock.get(key)) { + stub = (ClientService.BlockingInterface)this.stubs.get(key); + if (stub == null) { + BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, + ClientServiceSecurityInfo.class, User.getCurrent(), this.rpcTimeout); + stub = (ClientService.BlockingInterface)ClientService.newBlockingStub(channel); + // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. + // Just fail on first actual call rather than in here on setup. + this.stubs.put(key, stub); } } - return server; + return stub; + } + + static String getStubKey(final String serviceName, final String rsHostnamePort) { + return serviceName + "@" + rsHostnamePort; } @Override @@ -1568,14 +1579,12 @@ public class HConnectionManager { private static final long keepAlive = 5 * 60 * 1000; /** - * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have - * finished with it. + * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it. * @return The shared instance. Never returns null. */ public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher() throws IOException { synchronized (masterAndZKLock) { - if (keepAliveZookeeper == null) { // We don't check that our link to ZooKeeper is still valid // But there is a retry mechanism in the ZooKeeperWatcher itself @@ -1584,7 +1593,6 @@ public class HConnectionManager { } keepAliveZookeeperUserCount++; keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - return keepAliveZookeeper; } } @@ -1606,7 +1614,7 @@ public class HConnectionManager { /** * Creates a Chore thread to check the connections to master & zookeeper * and close them when they reach their closing time ( - * {@link MasterProtocolState#keepAliveUntil} and + * {@link MasterServiceState#keepAliveUntil} and * {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is * managed by the release functions and the variable {@link #keepAlive} */ @@ -1634,9 +1642,9 @@ public class HConnectionManager { return new DelayedClosing(hci, stoppable); } - protected void closeMasterProtocol(MasterProtocolState protocolState) { + protected void closeMasterProtocol(MasterServiceState protocolState) { if (System.currentTimeMillis() > protocolState.keepAliveUntil) { - hci.closeMasterProtocol(protocolState); + hci.closeMasterService(protocolState); protocolState.keepAliveUntil = Long.MAX_VALUE; } } @@ -1652,8 +1660,8 @@ public class HConnectionManager { hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; } } - closeMasterProtocol(hci.masterAdminProtocol); - closeMasterProtocol(hci.masterMonitorProtocol); + closeMasterProtocol(hci.adminMasterServiceState); + closeMasterProtocol(hci.monitorMasterServiceState); } } @@ -1681,117 +1689,289 @@ public class HConnectionManager { } } - private static class MasterProtocolHandler implements InvocationHandler { - private HConnectionImplementation connection; - private MasterProtocolState protocolStateTracker; + final MasterAdminServiceState adminMasterServiceState = new MasterAdminServiceState(this); + final MasterMonitorServiceState monitorMasterServiceState = + new MasterMonitorServiceState(this); - protected MasterProtocolHandler(HConnectionImplementation connection, - MasterProtocolState protocolStateTracker) { - this.connection = connection; - this.protocolStateTracker = protocolStateTracker; - } + @Override + public MasterAdminService.BlockingInterface getMasterAdmin() throws MasterNotRunningException { + return getKeepAliveMasterAdminService(); + } - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - if (method.getName().equals("close") && - method.getParameterTypes().length == 0) { - release(connection, protocolStateTracker); - return null; - } else { - try { - return method.invoke(protocolStateTracker.protocol, args); - }catch (InvocationTargetException e){ - // We will have this for all the exception, checked on not, sent - // by any layer, including the functional exception - Throwable cause = e.getCause(); - if (cause == null){ - throw new RuntimeException( - "Proxy invocation failed and getCause is null", e); - } - if (cause instanceof UndeclaredThrowableException) { - cause = cause.getCause(); - } - throw cause; - } + @Override + public MasterMonitorService.BlockingInterface getMasterMonitor() + throws MasterNotRunningException { + return getKeepAliveMasterMonitorService(); + } + + private void resetMasterServiceState(final MasterServiceState mss) { + mss.userCount++; + mss.keepAliveUntil = Long.MAX_VALUE; + } + + @Override + public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService() + throws MasterNotRunningException { + synchronized (masterAndZKLock) { + if (!isKeepAliveMasterConnectedAndRunning(this.adminMasterServiceState)) { + MasterAdminServiceStubMaker stubMaker = new MasterAdminServiceStubMaker(); + this.adminMasterServiceState.stub = stubMaker.makeStub(); } + resetMasterServiceState(this.adminMasterServiceState); } + // Ugly delegation just so we can add in a Close method. + final MasterAdminService.BlockingInterface stub = this.adminMasterServiceState.stub; + return new MasterAdminKeepAliveConnection() { + MasterAdminServiceState mss = adminMasterServiceState; + @Override + public AddColumnResponse addColumn(RpcController controller, + AddColumnRequest request) throws ServiceException { + return stub.addColumn(controller, request); + } - private void release( - HConnectionImplementation connection, - MasterProtocolState target) { - connection.releaseMaster(target); - } - } + @Override + public DeleteColumnResponse deleteColumn(RpcController controller, + DeleteColumnRequest request) throws ServiceException { + return stub.deleteColumn(controller, request); + } - MasterProtocolState masterAdminProtocol = - new MasterProtocolState(MasterAdminProtocol.class); - MasterProtocolState masterMonitorProtocol = - new MasterProtocolState(MasterMonitorProtocol.class); + @Override + public ModifyColumnResponse modifyColumn(RpcController controller, + ModifyColumnRequest request) throws ServiceException { + return stub.modifyColumn(controller, request); + } - /** - * This function allows HBaseAdmin and potentially others - * to get a shared master connection. - * - * @return The shared instance. Never returns null. - * @throws MasterNotRunningException - */ - private Object getKeepAliveMasterProtocol( - MasterProtocolState protocolState, Class connectionClass) - throws MasterNotRunningException { - synchronized (masterAndZKLock) { - if (!isKeepAliveMasterConnectedAndRunning(protocolState)) { - protocolState.protocol = null; - protocolState.protocol = createMasterWithRetries(protocolState); + @Override + public MoveRegionResponse moveRegion(RpcController controller, + MoveRegionRequest request) throws ServiceException { + return stub.moveRegion(controller, request); } - protocolState.userCount++; - protocolState.keepAliveUntil = Long.MAX_VALUE; - return Proxy.newProxyInstance( - connectionClass.getClassLoader(), - new Class[]{connectionClass}, - new MasterProtocolHandler(this, protocolState) - ); - } - } + @Override + public DispatchMergingRegionsResponse dispatchMergingRegions( + RpcController controller, DispatchMergingRegionsRequest request) + throws ServiceException { + return stub.dispatchMergingRegions(controller, request); + } - @Override - public MasterAdminProtocol getMasterAdmin() throws MasterNotRunningException { - return getKeepAliveMasterAdmin(); - } + @Override + public AssignRegionResponse assignRegion(RpcController controller, + AssignRegionRequest request) throws ServiceException { + return stub.assignRegion(controller, request); + } - @Override - public MasterMonitorProtocol getMasterMonitor() throws MasterNotRunningException { - return getKeepAliveMasterMonitor(); + @Override + public UnassignRegionResponse unassignRegion(RpcController controller, + UnassignRegionRequest request) throws ServiceException { + return stub.unassignRegion(controller, request); + } + + @Override + public OfflineRegionResponse offlineRegion(RpcController controller, + OfflineRegionRequest request) throws ServiceException { + return stub.offlineRegion(controller, request); + } + + @Override + public DeleteTableResponse deleteTable(RpcController controller, + DeleteTableRequest request) throws ServiceException { + return stub.deleteTable(controller, request); + } + + @Override + public EnableTableResponse enableTable(RpcController controller, + EnableTableRequest request) throws ServiceException { + return stub.enableTable(controller, request); + } + + @Override + public DisableTableResponse disableTable(RpcController controller, + DisableTableRequest request) throws ServiceException { + return stub.disableTable(controller, request); + } + + @Override + public ModifyTableResponse modifyTable(RpcController controller, + ModifyTableRequest request) throws ServiceException { + return stub.modifyTable(controller, request); + } + + @Override + public CreateTableResponse createTable(RpcController controller, + CreateTableRequest request) throws ServiceException { + return stub.createTable(controller, request); + } + + @Override + public ShutdownResponse shutdown(RpcController controller, + ShutdownRequest request) throws ServiceException { + return stub.shutdown(controller, request); + } + + @Override + public StopMasterResponse stopMaster(RpcController controller, + StopMasterRequest request) throws ServiceException { + return stub.stopMaster(controller, request); + } + + @Override + public BalanceResponse balance(RpcController controller, + BalanceRequest request) throws ServiceException { + return stub.balance(controller, request); + } + + @Override + public SetBalancerRunningResponse setBalancerRunning( + RpcController controller, SetBalancerRunningRequest request) + throws ServiceException { + return stub.setBalancerRunning(controller, request); + } + + @Override + public CatalogScanResponse runCatalogScan(RpcController controller, + CatalogScanRequest request) throws ServiceException { + return stub.runCatalogScan(controller, request); + } + + @Override + public EnableCatalogJanitorResponse enableCatalogJanitor( + RpcController controller, EnableCatalogJanitorRequest request) + throws ServiceException { + return stub.enableCatalogJanitor(controller, request); + } + + @Override + public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( + RpcController controller, IsCatalogJanitorEnabledRequest request) + throws ServiceException { + return stub.isCatalogJanitorEnabled(controller, request); + } + + @Override + public CoprocessorServiceResponse execMasterService( + RpcController controller, CoprocessorServiceRequest request) + throws ServiceException { + return stub.execMasterService(controller, request); + } + + @Override + public TakeSnapshotResponse snapshot(RpcController controller, + TakeSnapshotRequest request) throws ServiceException { + return stub.snapshot(controller, request); + } + + @Override + public ListSnapshotResponse getCompletedSnapshots( + RpcController controller, ListSnapshotRequest request) + throws ServiceException { + return stub.getCompletedSnapshots(controller, request); + } + + @Override + public DeleteSnapshotResponse deleteSnapshot(RpcController controller, + DeleteSnapshotRequest request) throws ServiceException { + return stub.deleteSnapshot(controller, request); + } + + @Override + public IsSnapshotDoneResponse isSnapshotDone(RpcController controller, + IsSnapshotDoneRequest request) throws ServiceException { + return stub.isSnapshotDone(controller, request); + } + + @Override + public RestoreSnapshotResponse restoreSnapshot( + RpcController controller, RestoreSnapshotRequest request) + throws ServiceException { + return stub.restoreSnapshot(controller, request); + } + + @Override + public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone( + RpcController controller, IsRestoreSnapshotDoneRequest request) + throws ServiceException { + return stub.isRestoreSnapshotDone(controller, request); + } + + @Override + public IsMasterRunningResponse isMasterRunning( + RpcController controller, IsMasterRunningRequest request) + throws ServiceException { + return stub.isMasterRunning(controller, request); + } + + @Override + public void close() { + release(this.mss); + } + }; } - @Override - public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin() - throws MasterNotRunningException { - return (MasterAdminKeepAliveConnection) - getKeepAliveMasterProtocol(masterAdminProtocol, MasterAdminKeepAliveConnection.class); + private static void release(MasterServiceState mss) { + if (mss != null && mss.connection != null) { + ((HConnectionImplementation)mss.connection).releaseMaster(mss); + } } @Override - public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor() - throws MasterNotRunningException { - return (MasterMonitorKeepAliveConnection) - getKeepAliveMasterProtocol(masterMonitorProtocol, MasterMonitorKeepAliveConnection.class); + public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService() + throws MasterNotRunningException { + synchronized (masterAndZKLock) { + if (!isKeepAliveMasterConnectedAndRunning(this.monitorMasterServiceState)) { + MasterMonitorServiceStubMaker stubMaker = new MasterMonitorServiceStubMaker(); + this.monitorMasterServiceState.stub = stubMaker.makeStub(); + } + resetMasterServiceState(this.monitorMasterServiceState); + } + // Ugly delegation just so can implement close + final MasterMonitorService.BlockingInterface stub = this.monitorMasterServiceState.stub; + return new MasterMonitorKeepAliveConnection() { + final MasterMonitorServiceState mss = monitorMasterServiceState; + @Override + public GetSchemaAlterStatusResponse getSchemaAlterStatus( + RpcController controller, GetSchemaAlterStatusRequest request) + throws ServiceException { + return stub.getSchemaAlterStatus(controller, request); + } + + @Override + public GetTableDescriptorsResponse getTableDescriptors( + RpcController controller, GetTableDescriptorsRequest request) + throws ServiceException { + return stub.getTableDescriptors(controller, request); + } + + @Override + public GetClusterStatusResponse getClusterStatus( + RpcController controller, GetClusterStatusRequest request) + throws ServiceException { + return stub.getClusterStatus(controller, request); + } + + @Override + public IsMasterRunningResponse isMasterRunning( + RpcController controller, IsMasterRunningRequest request) + throws ServiceException { + return stub.isMasterRunning(controller, request); + } + + @Override + public void close() throws IOException { + release(this.mss); + } + }; } - private boolean isKeepAliveMasterConnectedAndRunning(MasterProtocolState protocolState){ - if (protocolState.protocol == null){ + private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { + if (mss.getStub() == null){ return false; } try { - return protocolState.protocol.isMasterRunning( - null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning(); - }catch (UndeclaredThrowableException e){ + return mss.isMasterRunning(); + } catch (UndeclaredThrowableException e) { // It's somehow messy, but we can receive exceptions such as - // java.net.ConnectException but they're not declared. So we catch - // it... - LOG.info("Master connection is not running anymore", - e.getUndeclaredThrowable()); + // java.net.ConnectException but they're not declared. So we catch it... + LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); return false; } catch (ServiceException se) { LOG.warn("Checking master connection", se); @@ -1799,35 +1979,32 @@ public class HConnectionManager { } } - private void releaseMaster(MasterProtocolState protocolState) { - if (protocolState.protocol == null){ - return; - } + void releaseMaster(MasterServiceState mss) { + if (mss.getStub() == null) return; synchronized (masterAndZKLock) { - --protocolState.userCount; - if (protocolState.userCount <= 0) { - protocolState.keepAliveUntil = - System.currentTimeMillis() + keepAlive; + --mss.userCount; + if (mss.userCount <= 0) { + mss.keepAliveUntil = System.currentTimeMillis() + keepAlive; } } } - private void closeMasterProtocol(MasterProtocolState protocolState) { - if (protocolState.protocol != null){ - LOG.info("Closing master protocol: " + protocolState.protocolClass.getName()); - protocolState.protocol = null; + private void closeMasterService(MasterServiceState mss) { + if (mss.getStub() != null) { + LOG.info("Closing master protocol: " + mss); + mss.clearStub(); } - protocolState.userCount = 0; + mss.userCount = 0; } /** - * Immediate close of the shared master. Can be by the delayed close or - * when closing the connection itself. + * Immediate close of the shared master. Can be by the delayed close or when closing the + * connection itself. */ private void closeMaster() { synchronized (masterAndZKLock) { - closeMasterProtocol(masterAdminProtocol); - closeMasterProtocol(masterMonitorProtocol); + closeMasterService(adminMasterServiceState); + closeMasterService(monitorMasterServiceState); } } @@ -2471,8 +2648,8 @@ public class HConnectionManager { delayedClosing.stop("Closing connection"); closeMaster(); closeZooKeeperWatcher(); - this.servers.clear(); - this.rpcEngine.close(); + this.stubs.clear(); + this.rpcClient.stop(); if (clusterStatusListener != null) { clusterStatusListener.close(); } @@ -2513,7 +2690,7 @@ public class HConnectionManager { @Override public HTableDescriptor[] listTables() throws IOException { - MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor(); + MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService(); try { GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(null); @@ -2528,7 +2705,7 @@ public class HConnectionManager { @Override public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException { if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0]; - MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor(); + MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService(); try { GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); @@ -2554,7 +2731,7 @@ public class HConnectionManager { if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { return HTableDescriptor.META_TABLEDESC; } - MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor(); + MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService(); GetTableDescriptorsResponse htds; try { GetTableDescriptorsRequest req = @@ -2574,14 +2751,6 @@ public class HConnectionManager { } /** - * Override the RpcClientEngine implementation used by this connection. - * FOR TESTING PURPOSES ONLY! - */ - void setRpcEngine(RpcClientEngine engine) { - this.rpcEngine = engine; - } - - /** * The record of errors for servers. Visible for testing. */ @VisibleForTesting @@ -2683,17 +2852,15 @@ public class HConnectionManager { * @param c The Configuration instance to set the retries into. * @param log Used to log what we set in here. */ - public static void setServerSideHConnectionRetries(final Configuration c, + public static void setServerSideHConnectionRetries(final Configuration c, final String sn, final Log log) { int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); // Go big. Multiply by 10. If we can't get to meta after this many retries // then something seriously wrong. - int serversideMultiplier = - c.getInt("hbase.client.serverside.retries.multiplier", 10); + int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10); int retries = hcRetries * serversideMultiplier; c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); - log.debug("HConnection retries=" + retries); + log.debug(sn + " HConnection server-to-server retries=" + retries); } -} - +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 3948d16..bc1a686 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -534,7 +533,7 @@ public class HTable implements HTableInterface { throws IOException { return new ServerCallable(connection, tableName, row, operationTimeout) { public Result call() throws IOException { - return ProtobufUtil.getRowOrBefore(server, + return ProtobufUtil.getRowOrBefore(stub, location.getRegionInfo().getRegionName(), row, family); } }.withRetries(); @@ -580,7 +579,7 @@ public class HTable implements HTableInterface { public Result get(final Get get) throws IOException { return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) { public Result call() throws IOException { - return ProtobufUtil.get(server, + return ProtobufUtil.get(stub, location.getRegionInfo().getRegionName(), get); } }.withRetries(); @@ -649,7 +648,7 @@ public class HTable implements HTableInterface { try { MutateRequest request = RequestConverter.buildMutateRequest( location.getRegionInfo().getRegionName(), delete); - MutateResponse response = server.mutate(null, request); + MutateResponse response = stub.mutate(null, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -726,7 +725,7 @@ public class HTable implements HTableInterface { try { MultiRequest request = RequestConverter.buildMultiRequest( location.getRegionInfo().getRegionName(), rm); - server.multi(null, request); + stub.multi(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -751,7 +750,7 @@ public class HTable implements HTableInterface { location.getRegionInfo().getRegionName(), append); PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - MutateResponse response = server.mutate(rpcController, request); + MutateResponse response = stub.mutate(rpcController, request); if (!response.hasResult()) return null; return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); } catch (ServiceException se) { @@ -776,7 +775,7 @@ public class HTable implements HTableInterface { MutateRequest request = RequestConverter.buildMutateRequest( location.getRegionInfo().getRegionName(), increment); PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController(); - MutateResponse response = server.mutate(rpcContoller, request); + MutateResponse response = stub.mutate(rpcContoller, request); return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -821,7 +820,7 @@ public class HTable implements HTableInterface { location.getRegionInfo().getRegionName(), row, family, qualifier, amount, durability); PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - MutateResponse response = server.mutate(rpcController, request); + MutateResponse response = stub.mutate(rpcController, request); Result result = ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); @@ -846,7 +845,7 @@ public class HTable implements HTableInterface { MutateRequest request = RequestConverter.buildMutateRequest( location.getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = server.mutate(null, request); + MutateResponse response = stub.mutate(null, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -870,7 +869,7 @@ public class HTable implements HTableInterface { MutateRequest request = RequestConverter.buildMutateRequest( location.getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = server.mutate(null, request); + MutateResponse response = stub.mutate(null, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -889,7 +888,7 @@ public class HTable implements HTableInterface { try { GetRequest request = RequestConverter.buildGetRequest( location.getRegionInfo().getRegionName(), get, true); - GetResponse response = server.get(null, request); + GetResponse response = stub.get(null, request); return response.getExists(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -992,7 +991,7 @@ public class HTable implements HTableInterface { try { MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false); - MultiGetResponse responses = server.multiGet(null, requests); + MultiGetResponse responses = stub.multiGet(null, requests); return responses.getExistsList(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java index 7126073..7c36119 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java @@ -20,25 +20,25 @@ package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.MasterAdminProtocol; - -import java.io.Closeable; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos; /** * A KeepAlive connection is not physically closed immediately after the close, - * but rather kept alive for a few minutes. It makes sense only if it's shared. + * but rather kept alive for a few minutes. It makes sense only if it is shared. * - * This interface is used by a dynamic proxy. It allows to have a #close - * function in a master client. + *

This interface is implemented on a stub. It allows to have a #close function in a master + * client. * - * This class is intended to be used internally by HBase classes that need to - * speak the MasterAdminProtocol; but not by * final user code. Hence it's - * package protected. + *

This class is intended to be used internally by HBase classes that need to make invocations + * against the master on the MasterAdminProtos.MasterAdminService.BlockingInterface; but not by + * final user code. Hence it's package protected. */ -interface MasterAdminKeepAliveConnection extends MasterAdminProtocol, Closeable { - - @Override +interface MasterAdminKeepAliveConnection +extends MasterAdminProtos.MasterAdminService.BlockingInterface { + /** + * Close down all resources. + */ + // The Closeable Interface wants to throw an IOE out of a close. + // Thats a PITA. Do this below instead of Closeable. public void close(); -} - +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java index a4c7650..4f032b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java @@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.MasterMonitorProtocol; - import java.io.Closeable; +import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos; + /** * A KeepAlive connection is not physically closed immediately after the close, * but rather kept alive for a few minutes. It makes sense only if it's shared. @@ -36,9 +35,5 @@ import java.io.Closeable; * speak the MasterMonitorProtocol; but not by final user code. Hence it's * package protected. */ -interface MasterMonitorKeepAliveConnection extends MasterMonitorProtocol, Closeable { - - @Override - public void close(); -} - +interface MasterMonitorKeepAliveConnection +extends MasterMonitorProtos.MasterMonitorService.BlockingInterface, Closeable {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java index 99ba7b7..578959d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java @@ -26,10 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PairOfSameType; import java.io.Closeable; import java.io.IOException; @@ -275,7 +273,7 @@ public class MetaScanner { public static List listAllRegions(Configuration conf, final boolean offlined) throws IOException { final List regions = new ArrayList(); - MetaScannerVisitor visitor = new DefaultMetaScannerVisitor(conf) { + MetaScannerVisitor visitor = new DefaultMetaScannerVisitor() { @Override public boolean processRowInternal(Result result) throws IOException { if (result == null || result.isEmpty()) { @@ -310,7 +308,7 @@ public class MetaScanner { final byte [] tablename, final boolean offlined) throws IOException { final NavigableMap regions = new TreeMap(); - MetaScannerVisitor visitor = new TableMetaScannerVisitor(conf, tablename) { + MetaScannerVisitor visitor = new TableMetaScannerVisitor(tablename) { @Override public boolean processRowInternal(Result rowResult) throws IOException { HRegionInfo info = getHRegionInfo(rowResult); @@ -354,10 +352,8 @@ public class MetaScanner { public static abstract class DefaultMetaScannerVisitor extends MetaScannerVisitorBase { - protected Configuration conf; - - public DefaultMetaScannerVisitor(Configuration conf) { - this.conf = conf; + public DefaultMetaScannerVisitor() { + super(); } public abstract boolean processRowInternal(Result rowResult) throws IOException; @@ -386,8 +382,8 @@ public class MetaScanner { public static abstract class TableMetaScannerVisitor extends DefaultMetaScannerVisitor { private byte[] tableName; - public TableMetaScannerVisitor(Configuration conf, byte[] tableName) { - super(conf); + public TableMetaScannerVisitor(byte[] tableName) { + super(); this.tableName = tableName; } @@ -402,6 +398,5 @@ public class MetaScanner { } return super.processRow(rowResult); } - } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 2a3d2c5..45157cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -74,7 +74,7 @@ class MultiServerCallable extends ServerCallable { RequestConverter.buildNoDataMultiRequest(regionName, rms, cells); // Carry the cells over the proxy/pb Service interface using the payload carrying // rpc controller. - server.multi(new PayloadCarryingRpcController(cells), multiRequest); + stub.multi(new PayloadCarryingRpcController(cells), multiRequest); // This multi call does not return results. response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT); } catch (ServiceException se) { @@ -99,7 +99,7 @@ class MultiServerCallable extends ServerCallable { // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); - ClientProtos.MultiResponse responseProto = server.multi(controller, multiRequest); + ClientProtos.MultiResponse responseProto = stub.multi(controller, multiRequest); results = ResponseConverter.getResults(responseProto, controller.cellScanner()); } catch (ServiceException se) { ex = ProtobufUtil.getRemoteException(se); @@ -114,7 +114,7 @@ class MultiServerCallable extends ServerCallable { } @Override - public void connect(boolean reload) throws IOException { - server = connection.getClient(loc.getServerName()); + public void prepare(boolean reload) throws IOException { + stub = connection.getClient(loc.getServerName()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 3ede761..c1d40fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -92,9 +92,9 @@ public class ScannerCallable extends ServerCallable { * @throws IOException */ @Override - public void connect(boolean reload) throws IOException { + public void prepare(boolean reload) throws IOException { if (!instantiated || reload) { - super.connect(reload); + super.prepare(reload); checkIfRegionServerIsRemote(); instantiated = true; } @@ -144,7 +144,7 @@ public class ScannerCallable extends ServerCallable { RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; try { - response = server.scan(null, request); + response = stub.scan(null, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call // from client to server will increment this number in both sides. Client passes this // number along with the request and at RS side both the incoming nextCallSeq and its @@ -248,7 +248,7 @@ public class ScannerCallable extends ServerCallable { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true); try { - server.scan(null, request); + stub.scan(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -265,7 +265,7 @@ public class ScannerCallable extends ServerCallable { this.location.getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = server.scan(null, request); + ScanResponse response = stub.scan(null, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java index ae099dd..1b2e54a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java @@ -29,7 +29,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; -import org.apache.hadoop.hbase.ipc.HBaseClientRPC; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -47,7 +48,7 @@ import java.util.concurrent.Callable; * return type and method we actually invoke on remote Server. Usually * used inside a try/catch that fields usual connection failures all wrapped * up in a retry loop. - *

Call {@link #connect(boolean)} to connect to server hosting region + *

Call {@link #prepare(boolean)} to connect to server hosting region * that contains the passed row in the passed table before invoking * {@link #call()}. * @see HConnection#getRegionServerWithoutRetries(ServerCallable) @@ -62,7 +63,7 @@ public abstract class ServerCallable implements Callable { protected final byte [] tableName; protected final byte [] row; protected HRegionLocation location; - protected ClientProtocol server; + protected ClientService.BlockingInterface stub; protected int callTimeout; protected long globalStartTime; protected long startTime, endTime; @@ -86,13 +87,14 @@ public abstract class ServerCallable implements Callable { } /** - * Connect to the server hosting region with row from tablename. + * Prepare for connection to the server hosting region with row from tablename. Does lookup + * to find region location and hosting server. * @param reload Set this to true if connection should re-find the region * @throws IOException e */ - public void connect(final boolean reload) throws IOException { + public void prepare(final boolean reload) throws IOException { this.location = connection.getRegionLocation(tableName, row, reload); - this.server = connection.getClient(location.getServerName()); + this.stub = connection.getClient(location.getServerName()); } /** @return the server name @@ -127,11 +129,11 @@ public abstract class ServerCallable implements Callable { // resetting to the minimum. remaining = MIN_RPC_TIMEOUT; } - HBaseClientRPC.setRpcTimeout(remaining); + RpcClient.setRpcTimeout(remaining); } public void afterCall() { - HBaseClientRPC.resetRpcTimeout(); + RpcClient.resetRpcTimeout(); this.endTime = EnvironmentEdgeManager.currentTimeMillis(); } @@ -164,11 +166,11 @@ public abstract class ServerCallable implements Callable { long expectedSleep = 0; try { beforeCall(); - connect(tries != 0); // if called with false, check table status on ZK + prepare(tries != 0); // if called with false, check table status on ZK return call(); } catch (Throwable t) { - LOG.warn("Received exception, tries=" + tries + ", numRetries=" + numRetries + - " message=" + t.getMessage()); + LOG.warn("Received exception, tries=" + tries + ", numRetries=" + numRetries + ":" + + t.getMessage()); t = translateException(t); // translateException throws an exception when we should not retry, i.e. when it's the @@ -237,7 +239,7 @@ public abstract class ServerCallable implements Callable { this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { beforeCall(); - connect(false); + prepare(false); return call(); } catch (Throwable t) { Throwable t2 = translateException(t); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BadAuthException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BadAuthException.java new file mode 100644 index 0000000..66bd133 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BadAuthException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public class BadAuthException extends FatalConnectionException { + public BadAuthException() { + super(); + } + + public BadAuthException(String msg) { + super(msg); + } + + public BadAuthException(String msg, Throwable t) { + super(msg, t); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java deleted file mode 100644 index be60417..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ /dev/null @@ -1,1479 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import javax.net.SocketFactory; -import javax.security.sasl.SaslException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.IpcProtocol; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; -import org.apache.hadoop.hbase.util.PoolMap.PoolType; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.cloudera.htrace.Span; -import org.cloudera.htrace.Trace; - -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; -import com.google.protobuf.TextFormat; - - -/** - * A client for an IPC service. IPC calls take a single Protobuf message as a - * request and returns a single Protobuf message as result. A service runs on - * a port and is defined by a parameter class and a value class. - * - *

See HBaseServer - */ -@InterfaceAudience.Private -public class HBaseClient { - public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final PoolMap connections; - private ReflectionCache reflectionCache = new ReflectionCache(); - - protected int counter; // counter for call ids - protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs - final protected Configuration conf; - final protected int maxIdleTime; // connections will be culled if it was idle for - // maxIdleTime microsecs - final protected int maxRetries; //the max. no. of retries for socket connections - final protected long failureSleep; // Time to sleep before retry on failure. - protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm - protected final boolean tcpKeepAlive; // if T then use keepalives - protected int pingInterval; // how often sends ping to the server in msecs - protected int socketTimeout; // socket timeout - protected FailedServers failedServers; - private final Codec codec; - private final CompressionCodec compressor; - private final IPCUtil ipcUtil; - - protected final SocketFactory socketFactory; // how to create sockets - protected String clusterId; - - final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; - final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; - final static int DEFAULT_PING_INTERVAL = 60000; // 1 min - final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds - final static int PING_CALL_ID = -1; - - public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; - public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; - - /** - * A class to manage a list of servers that failed recently. - */ - static class FailedServers { - private final LinkedList> failedServers = new - LinkedList>(); - private final int recheckServersTimeout; - - FailedServers(Configuration conf) { - this.recheckServersTimeout = conf.getInt( - FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT); - } - - /** - * Add an address to the list of the failed servers list. - */ - public synchronized void addToFailedServers(InetSocketAddress address) { - final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout; - failedServers.addFirst(new Pair(expiry, address.toString())); - } - - /** - * Check if the server should be considered as bad. Clean the old entries of the list. - * - * @return true if the server is in the failed servers list - */ - public synchronized boolean isFailedServer(final InetSocketAddress address) { - if (failedServers.isEmpty()) { - return false; - } - - final String lookup = address.toString(); - final long now = EnvironmentEdgeManager.currentTimeMillis(); - - // iterate, looking for the search entry and cleaning expired entries - Iterator> it = failedServers.iterator(); - while (it.hasNext()) { - Pair cur = it.next(); - if (cur.getFirst() < now) { - it.remove(); - } else { - if (lookup.equals(cur.getSecond())) { - return true; - } - } - } - - return false; - } - } - - @SuppressWarnings("serial") - public static class FailedServerException extends IOException { - public FailedServerException(String s) { - super(s); - } - } - - /** - * set the ping interval value in configuration - * - * @param conf Configuration - * @param pingInterval the ping interval - */ - // Any reason we couldn't just do tcp keepalive instead of this pingery? - // St.Ack 20130121 - public static void setPingInterval(Configuration conf, int pingInterval) { - conf.setInt(PING_INTERVAL_NAME, pingInterval); - } - - /** - * Get the ping interval from configuration; - * If not set in the configuration, return the default value. - * - * @param conf Configuration - * @return the ping interval - */ - static int getPingInterval(Configuration conf) { - return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); - } - - /** - * Set the socket timeout - * @param conf Configuration - * @param socketTimeout the socket timeout - */ - public static void setSocketTimeout(Configuration conf, int socketTimeout) { - conf.setInt(SOCKET_TIMEOUT, socketTimeout); - } - - /** - * @return the socket timeout - */ - static int getSocketTimeout(Configuration conf) { - return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); - } - - /** A call waiting for a value. */ - protected class Call { - final int id; // call id - final Message param; // rpc request method param object - /** - * Optionally has cells when making call. Optionally has cells set on response. Used - * passing cells to the rpc and receiving the response. - */ - CellScanner cells; - Message response; // value, null if error - IOException error; // exception, null if value - boolean done; // true when call is done - long startTime; - final Method method; - - protected Call(final Method method, Message param, final CellScanner cells) { - this.param = param; - this.method = method; - this.cells = cells; - this.startTime = System.currentTimeMillis(); - synchronized (HBaseClient.this) { - this.id = counter++; - } - } - - @Override - public String toString() { - return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" + - (this.param != null? TextFormat.shortDebugString(this.param): "") + "}"; - } - - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller - } - - /** Set the exception when there is an error. - * Notify the caller the call is done. - * - * @param error exception thrown by the call; either local or remote - */ - public synchronized void setException(IOException error) { - this.error = error; - callComplete(); - } - - /** Set the return value when there is no error. - * Notify the caller the call is done. - * - * @param response return value of the call. - * @param cells Can be null - */ - public synchronized void setResponse(Message response, final CellScanner cells) { - this.response = response; - this.cells = cells; - callComplete(); - } - - public long getStartTime() { - return this.startTime; - } - } - - protected final static Map> tokenHandlers = - new HashMap>(); - static { - tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(), - new AuthenticationTokenSelector()); - } - - /** - * Creates a connection. Can be overridden by a subclass for testing. - * @param remoteId - the ConnectionId to use for the connection creation. - */ - protected Connection createConnection(ConnectionId remoteId, final Codec codec, - final CompressionCodec compressor) - throws IOException { - return new Connection(remoteId, codec, compressor); - } - - /** Thread that reads responses and notifies callers. Each connection owns a - * socket connected to a remote address. Calls are multiplexed through this - * socket: responses may be delivered out of order. */ - protected class Connection extends Thread { - private ConnectionHeader header; // connection header - protected ConnectionId remoteId; - protected Socket socket = null; // connected socket - protected DataInputStream in; - protected DataOutputStream out; - private InetSocketAddress server; // server ip:port - private String serverPrincipal; // server's krb5 principal name - private AuthMethod authMethod; // authentication method - private boolean useSasl; - private Token token; - private HBaseSaslRpcClient saslRpcClient; - private int reloginMaxBackoff; // max pause before relogin on sasl failure - private final Codec codec; - private final CompressionCodec compressor; - - // currently active calls - protected final ConcurrentSkipListMap calls = - new ConcurrentSkipListMap(); - protected final AtomicLong lastActivity = - new AtomicLong(); // last I/O activity time - protected final AtomicBoolean shouldCloseConnection = - new AtomicBoolean(); // indicate if the connection is closed - protected IOException closeException; // close reason - - Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) - throws IOException { - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.server = remoteId.getAddress(); - this.codec = codec; - this.compressor = compressor; - - UserGroupInformation ticket = remoteId.getTicket().getUGI(); - Class protocol = remoteId.getProtocol(); - this.useSasl = User.isHBaseSecurityEnabled(conf); - if (useSasl && protocol != null) { - TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class); - if (tokenInfo != null) { - TokenSelector tokenSelector = - tokenHandlers.get(tokenInfo.value()); - if (tokenSelector != null) { - token = tokenSelector.selectToken(new Text(clusterId), - ticket.getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type "+tokenInfo.value()); - } - } - KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class); - if (krbInfo != null) { - String serverKey = krbInfo.serverPrincipal(); - if (serverKey == null) { - throw new IOException( - "Can't obtain server Kerberos config key from KerberosInfo"); - } - serverPrincipal = SecurityUtil.getServerPrincipal( - conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase()); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for protocol=" - + protocol.getCanonicalName() + " is " + serverPrincipal); - } - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Use " + authMethod + " authentication for protocol " - + (protocol == null ? "null" : protocol.getSimpleName())); - } - reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); - this.remoteId = remoteId; - - ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); - builder.setProtocol(protocol == null ? "" : protocol.getName()); - UserInformation userInfoPB; - if ((userInfoPB = getUserInfo(ticket)) != null) { - builder.setUserInfo(userInfoPB); - } - builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); - if (this.compressor != null) { - builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); - } - this.header = builder.build(); - - this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + - remoteId.getAddress().toString() + - ((ticket==null)?" from an unknown user": (" from " - + ticket.getUserName()))); - this.setDaemon(true); - } - - private UserInformation getUserInfo(UserGroupInformation ugi) { - if (ugi == null || authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - return null; - } - UserInformation.Builder userInfoPB = UserInformation.newBuilder(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - } else if (authMethod == AuthMethod.SIMPLE) { - //Send both effective user and real user for simple auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - if (ugi.getRealUser() != null) { - userInfoPB.setRealUser(ugi.getRealUser().getUserName()); - } - } - return userInfoPB.build(); - } - - /** Update lastActivity with the current time. */ - protected void touch() { - lastActivity.set(System.currentTimeMillis()); - } - - /** - * Add a call to this connection's call queue and notify - * a listener; synchronized. If the connection is dead, the call is not added, and the - * caller is notified. - * This function can return a connection that is already marked as 'shouldCloseConnection' - * It is up to the user code to check this status. - * @param call to add - */ - protected synchronized void addCall(Call call) { - // If the connection is about to close, we manage this as if the call was already added - // to the connection calls list. If not, the connection creations are serialized, as - // mentioned in HBASE-6364 - if (this.shouldCloseConnection.get()) { - if (this.closeException == null) { - call.setException(new IOException( - "Call " + call.id + " not added as the connection " + remoteId + " is closing")); - } else { - call.setException(this.closeException); - } - synchronized (call) { - call.notifyAll(); - } - } else { - calls.put(call.id, call); - notify(); - } - } - - /** This class sends a ping to the remote side when timeout on - * reading. If no failure is detected, it retries until at least - * a byte is read. - */ - protected class PingInputStream extends FilterInputStream { - /* constructor */ - protected PingInputStream(InputStream in) { - super(in); - } - - /* Process timeout exception - * if the connection is not going to be closed, send a ping. - * otherwise, throw the timeout exception. - */ - private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || - remoteId.rpcTimeout > 0) { - throw e; - } - sendPing(); - } - - /** Read a byte from the stream. - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * @throws IOException for any IO problem other than socket timeout - */ - @Override - public int read() throws IOException { - do { - try { - return super.read(); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); - } - - /** Read bytes into a buffer starting from offset off - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * - * @return the total number of bytes read; -1 if the connection is closed. - */ - @Override - public int read(byte[] buf, int off, int len) throws IOException { - do { - try { - return super.read(buf, off, len); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); - } - } - - protected synchronized void setupConnection() throws IOException { - short ioFailures = 0; - short timeoutFailures = 0; - while (true) { - try { - this.socket = socketFactory.createSocket(); - this.socket.setTcpNoDelay(tcpNoDelay); - this.socket.setKeepAlive(tcpKeepAlive); - // connection time out is 20s - NetUtils.connect(this.socket, remoteId.getAddress(), - getSocketTimeout(conf)); - if (remoteId.rpcTimeout > 0) { - pingInterval = remoteId.rpcTimeout; // overwrite pingInterval - } - this.socket.setSoTimeout(pingInterval); - return; - } catch (SocketTimeoutException toe) { - /* The max number of retries is 45, - * which amounts to 20s*45 = 15 minutes retries. - */ - handleConnectionFailure(timeoutFailures++, maxRetries, toe); - } catch (IOException ie) { - handleConnectionFailure(ioFailures++, maxRetries, ie); - } - } - } - - protected void closeConnection() { - // close the current connection - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - LOG.warn("Not able to close a socket", e); - } - } - // set socket to null so that the next call to setupIOstreams - // can start the process of connect all over again. - socket = null; - } - - /** - * Handle connection failures - * - * If the current number of retries is equal to the max number of retries, - * stop retrying and throw the exception; Otherwise backoff N seconds and - * try connecting again. - * - * This Method is only called from inside setupIOstreams(), which is - * synchronized. Hence the sleep is synchronized; the locks will be retained. - * - * @param curRetries current number of retries - * @param maxRetries max number of retries allowed - * @param ioe failure reason - * @throws IOException if max number of retries is reached - */ - private void handleConnectionFailure( - int curRetries, int maxRetries, IOException ioe) throws IOException { - - closeConnection(); - - // throw the exception if the maximum number of retries is reached - if (curRetries >= maxRetries) { - throw ioe; - } - - // otherwise back off and retry - try { - Thread.sleep(failureSleep); - } catch (InterruptedException ignored) {} - - LOG.info("Retrying connect to server: " + remoteId.getAddress() + - " after sleeping " + failureSleep + "ms. Already tried " + curRetries + - " time(s)."); - } - - /* wait till someone signals us to start reading RPC response or - * it is idle too long, it is marked as to be closed, - * or the client is marked as not running. - * - * Return true if it is time to read a response; false otherwise. - */ - protected synchronized boolean waitForWork() { - if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { - long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get()); - if (timeout>0) { - try { - wait(timeout); - } catch (InterruptedException ignored) {} - } - } - - if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { - return true; - } else if (shouldCloseConnection.get()) { - return false; - } else if (calls.isEmpty()) { // idle connection closed or stopped - markClosed(null); - return false; - } else { // get stopped but there are still pending requests - markClosed((IOException)new IOException().initCause( - new InterruptedException())); - return false; - } - } - - public InetSocketAddress getRemoteAddress() { - return remoteId.getAddress(); - } - - /* Send a ping to the server if the time elapsed - * since last I/O activity is equal to or greater than the ping interval - */ - protected synchronized void sendPing() throws IOException { - // Can we do tcp keepalive instead of this pinging? - long curTime = System.currentTimeMillis(); - if ( curTime - lastActivity.get() >= pingInterval) { - lastActivity.set(curTime); - //noinspection SynchronizeOnNonFinalField - synchronized (this.out) { - out.writeInt(PING_CALL_ID); - out.flush(); - } - } - } - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": starting, connections " + connections.size()); - } - - try { - while (waitForWork()) {//wait here for work - read or close connection - readResponse(); - } - } catch (Throwable t) { - LOG.warn(getName() + ": unexpected exception receiving call responses", t); - markClosed(new IOException("Unexpected exception receiving call responses", t)); - } - - close(); - - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": stopped, connections " + connections.size()); - } - - private synchronized void disposeSasl() { - if (saslRpcClient != null) { - try { - saslRpcClient.dispose(); - saslRpcClient = null; - } catch (IOException ioe) { - LOG.error("Error disposing of SASL client", ioe); - } - } - } - - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = - UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && - loginUser != null && - //Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } - - private synchronized boolean setupSaslConnection(final InputStream in2, - final OutputStream out2) throws IOException { - saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal); - return saslRpcClient.saslConnect(in2, out2); - } - - /** - * If multiple clients with the same principal try to connect - * to the same server at the same time, the server assumes a - * replay attack is in progress. This is a feature of kerberos. - * In order to work around this, what is done is that the client - * backs off randomly and tries to initiate the connection - * again. - * The other problem is to do with ticket expiry. To handle that, - * a relogin is attempted. - *

- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} - * method. In case when the user doesn't have valid credentials, we don't - * need to retry (from cache or ticket). In such cases, it is prudent to - * throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from - * other high level (for eg, HCM or HBaseAdmin). - *

- */ - private synchronized void handleSaslConnectionFailure( - final int currRetries, - final int maxRetries, final Exception ex, final Random rand, - final UserGroupInformation user) - throws IOException, InterruptedException{ - user.doAs(new PrivilegedExceptionAction() { - public Object run() throws IOException, InterruptedException { - closeConnection(); - if (shouldAuthenticateOverKrb()) { - if (currRetries < maxRetries) { - LOG.debug("Exception encountered while connecting to " + - "the server : " + ex); - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - disposeSasl(); - //have granularity of milliseconds - //we are sleeping with the Connection lock held but since this - //connection instance is being used for connecting to the server - //in question, it is okay - Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); - return null; - } else { - String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; - LOG.warn(msg); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException)ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + - " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); - } - throw new IOException(ex); - } - }); - } - - protected synchronized void setupIOstreams() - throws IOException, InterruptedException { - if (socket != null || shouldCloseConnection.get()) { - return; - } - - if (failedServers.isFailedServer(remoteId.getAddress())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + server + - " this server is in the failed servers list"); - } - IOException e = new FailedServerException( - "This server is in the failed servers list: " + server); - markClosed(e); - close(); - throw e; - } - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to " + server); - } - short numRetries = 0; - final short MAX_RETRIES = 5; - Random rand = null; - while (true) { - setupConnection(); - InputStream inStream = NetUtils.getInputStream(socket); - OutputStream outStream = NetUtils.getOutputStream(socket); - // Write out the preamble -- MAGIC, version, and auth to use. - writeConnectionHeaderPreamble(outStream); - if (useSasl) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; - UserGroupInformation ticket = remoteId.getTicket().getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket != null && ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); - } - } - boolean continueSasl; - try { - if (ticket == null) { - throw new NullPointerException("ticket is null"); - } else { - continueSasl = - ticket.doAs(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); - } - }); - } - } catch (Exception ex) { - if (rand == null) { - rand = new Random(); - } - handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket); - continue; - } - if (continueSasl) { - // Sasl connect is successful. Let's set up Sasl i/o streams. - inStream = saslRpcClient.getInputStream(inStream); - outStream = saslRpcClient.getOutputStream(outStream); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - useSasl = false; - } - } - this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); - this.out = new DataOutputStream(new BufferedOutputStream(outStream)); - // Now write out the connection header - writeConnectionHeader(); - - // update last activity time - touch(); - - // start the receiver thread after the socket connection has been set up - start(); - return; - } - } catch (Throwable t) { - failedServers.addToFailedServers(remoteId.address); - IOException e; - if (t instanceof IOException) { - e = (IOException)t; - markClosed(e); - } else { - e = new IOException("Coundn't set up IO Streams", t); - markClosed(e); - } - close(); - throw e; - } - } - - /** - * Write the RPC header: - */ - private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException { - // Assemble the preamble up in a buffer first and then send it. Writing individual elements, - // they are getting sent across piecemeal according to wireshark and then server is messing - // up the reading on occasion (the passed in stream is not buffered yet). - - // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE - int rpcHeaderLen = HConstants.RPC_HEADER.array().length; - byte [] preamble = new byte [rpcHeaderLen + 2]; - System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen); - preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; - preamble[rpcHeaderLen + 1] = authMethod.code; - outStream.write(preamble); - outStream.flush(); - } - - /** - * Write the connection header. - * Out is not synchronized because only the first thread does this. - */ - private void writeConnectionHeader() throws IOException { - this.out.writeInt(this.header.getSerializedSize()); - this.header.writeTo(this.out); - this.out.flush(); - } - - /** Close the connection. */ - protected synchronized void close() { - if (!shouldCloseConnection.get()) { - LOG.error(getName() + ": the connection is not in the closed state"); - return; - } - - // release the resources - // first thing to do;take the connection out of the connection list - synchronized (connections) { - if (connections.get(remoteId) == this) { - connections.remove(remoteId); - } - } - - // close the streams and therefore the socket - IOUtils.closeStream(out); - IOUtils.closeStream(in); - disposeSasl(); - - // clean up all calls - if (closeException == null) { - if (!calls.isEmpty()) { - LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " + - "#Calls: " + calls.size()); - - // clean up calls anyway - closeException = new IOException("Unexpected closed connection"); - cleanupCalls(); - } - } else { - // log the info - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": closing ipc connection to " + server + ": " + - closeException.getMessage(),closeException); - } - - // cleanup calls - cleanupCalls(); - } - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); - } - - /** - * Initiates a call by sending the parameter to the remote server. - * Note: this is not called from the Connection thread, but by other - * threads. - * @param call - * @see #readResponse() - */ - protected void writeRequest(Call call) { - if (shouldCloseConnection.get()) return; - try { - RequestHeader.Builder builder = RequestHeader.newBuilder(); - builder.setCallId(call.id); - if (Trace.isTracing()) { - Span s = Trace.currentTrace(); - builder.setTraceInfo(RPCTInfo.newBuilder(). - setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - builder.setMethodName(call.method.getName()); - builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); - if (cellBlock != null) { - CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - builder.setCellBlockMeta(cellBlockBuilder.build()); - } - //noinspection SynchronizeOnNonFinalField - RequestHeader header = builder.build(); - synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - IPCUtil.write(this.out, header, call.param, cellBlock); - } - if (LOG.isTraceEnabled()) { - LOG.trace(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); - } - } catch(IOException e) { - markClosed(e); - } - } - - /* Receive a response. - * Because only one receiver, so no synchronization on in. - */ - protected void readResponse() { - if (shouldCloseConnection.get()) return; - touch(); - try { - // See HBaseServer.Call.setResponse for where we write out the response. - - // Total size of the response. Unused. But have to read it in anyways. - /*int totalSize =*/ in.readInt(); - - // Read the header - ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": got response header " + - TextFormat.shortDebugString(responseHeader)); - } - Call call = calls.get(id); - if (responseHeader.hasException()) { - ExceptionResponse exceptionResponse = responseHeader.getException(); - RemoteException re = createRemoteException(exceptionResponse); - if (isFatalConnectionException(exceptionResponse)) { - markClosed(re); - } else { - if (call != null) call.setException(re); - } - } else { - Message rpcResponseType = null; - if (call != null){ - try { - // TODO: Why pb engine pollution in here in this class? FIX. - rpcResponseType = - ProtobufRpcClientEngine.Invoker.getReturnProtoType( - reflectionCache.getMethod(remoteId.getProtocol(), call.method.getName())); - } catch (Exception e) { - throw new RuntimeException(e); //local exception - } - } - Message value = null; - if (rpcResponseType != null) { - Builder builder = rpcResponseType.newBuilderForType(); - builder.mergeDelimitedFrom(in); - value = builder.build(); - } - CellScanner cellBlockScanner = null; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - byte [] cellBlock = new byte[size]; - IPCUtil.readChunked(this.in, cellBlock, 0, size); - cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); - } - // it's possible that this call may have been cleaned up due to a RPC - // timeout, so check if it still exists before setting the value. - if (call != null) call.setResponse(value, cellBlockScanner); - } - if (call != null) calls.remove(id); - } catch (IOException e) { - if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { - // Clean up open calls but don't treat this as a fatal condition, - // since we expect certain responses to not make it by the specified - // {@link ConnectionId#rpcTimeout}. - closeException = e; - } else { - // Since the server did not respond within the default ping interval - // time, treat this as a fatal condition and close this connection - markClosed(e); - } - } finally { - if (remoteId.rpcTimeout > 0) { - cleanupCalls(remoteId.rpcTimeout); - } - } - } - - /** - * @param e - * @return True if the exception is a fatal connection exception. - */ - private boolean isFatalConnectionException(final ExceptionResponse e) { - return e.getExceptionClassName(). - equals(FatalConnectionException.class.getName()); - } - - /** - * @param e - * @return RemoteException made from passed e - */ - private RemoteException createRemoteException(final ExceptionResponse e) { - String innerExceptionClassName = e.getExceptionClassName(); - boolean doNotRetry = e.getDoNotRetry(); - return e.hasHostname()? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, - e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry): - new RemoteWithExtrasException(innerExceptionClassName, - e.getStackTrace(), doNotRetry); - } - - protected synchronized void markClosed(IOException e) { - if (shouldCloseConnection.compareAndSet(false, true)) { - closeException = e; - notifyAll(); - } - } - - /* Cleanup all calls and mark them as done */ - protected void cleanupCalls() { - cleanupCalls(0); - } - - protected void cleanupCalls(long rpcTimeout) { - Iterator> itor = calls.entrySet().iterator(); - while (itor.hasNext()) { - Call c = itor.next().getValue(); - long waitTime = System.currentTimeMillis() - c.getStartTime(); - if (waitTime >= rpcTimeout) { - if (this.closeException == null) { - // There may be no exception in the case that there are many calls - // being multiplexed over this connection and these are succeeding - // fine while this Call object is taking a long time to finish - // over on the server; e.g. I just asked the regionserver to bulk - // open 3k regions or its a big fat multiput into a heavily-loaded - // server (Perhaps this only happens at the extremes?) - this.closeException = new CallTimeoutException("Call id=" + c.id + - ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); - } - c.setException(this.closeException); - synchronized (c) { - c.notifyAll(); - } - itor.remove(); - } else { - break; - } - } - try { - if (!calls.isEmpty()) { - Call firstCall = calls.get(calls.firstKey()); - long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); - if (maxWaitTime < rpcTimeout) { - rpcTimeout -= maxWaitTime; - } - } - if (!shouldCloseConnection.get()) { - closeException = null; - if (socket != null) { - socket.setSoTimeout((int) rpcTimeout); - } - } - } catch (SocketException e) { - LOG.debug("Couldn't lower timeout, which may result in longer than expected calls"); - } - } - } - - /** - * Client-side call timeout - */ - @SuppressWarnings("serial") - public static class CallTimeoutException extends IOException { - public CallTimeoutException(final String msg) { - super(msg); - } - } - - /** - * Construct an IPC client whose values are of the {@link Message} - * class. - * @param conf configuration - * @param factory socket factory - */ - public HBaseClient(Configuration conf, String clusterId, SocketFactory factory) { - this.maxIdleTime = - conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s - this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); - this.failureSleep = conf.getInt("hbase.client.pause", 1000); - this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); - this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); - this.pingInterval = getPingInterval(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Ping interval: " + this.pingInterval + "ms."); - } - this.ipcUtil = new IPCUtil(conf); - this.conf = conf; - this.codec = getCodec(conf); - this.compressor = getCompressor(conf); - this.socketFactory = factory; - this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; - this.connections = new PoolMap( - getPoolType(conf), getPoolSize(conf)); - this.failedServers = new FailedServers(conf); - } - - /** - * Encapsulate the ugly casting and RuntimeException conversion in private method. - * @param conf - * @return Codec to use on this client. - */ - private static Codec getCodec(final Configuration conf) { - String className = conf.get("hbase.client.rpc.codec", KeyValueCodec.class.getCanonicalName()); - try { - return (Codec)Class.forName(className).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Failed getting codec " + className, e); - } - } - - /** - * Encapsulate the ugly casting and RuntimeException conversion in private method. - * @param conf - * @return The compressor to use on this client. - */ - private static CompressionCodec getCompressor(final Configuration conf) { - String className = conf.get("hbase.client.rpc.compressor", null); - if (className == null || className.isEmpty()) return null; - try { - return (CompressionCodec)Class.forName(className).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Failed getting compressor " + className, e); - } - } - - /** - * Construct an IPC client with the default SocketFactory - * @param conf configuration - */ - public HBaseClient(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf)); - } - - /** - * Return the pool type specified in the configuration, which must be set to - * either {@link PoolType#RoundRobin} or {@link PoolType#ThreadLocal}, - * otherwise default to the former. - * - * For applications with many user threads, use a small round-robin pool. For - * applications with few user threads, you may want to try using a - * thread-local pool. In any case, the number of {@link HBaseClient} instances - * should not exceed the operating system's hard limit on the number of - * connections. - * - * @param config configuration - * @return either a {@link PoolType#RoundRobin} or - * {@link PoolType#ThreadLocal} - */ - protected static PoolType getPoolType(Configuration config) { - return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), - PoolType.RoundRobin, PoolType.ThreadLocal); - } - - /** - * Return the pool size specified in the configuration, which is applicable only if - * the pool type is {@link PoolType#RoundRobin}. - * - * @param config - * @return the maximum pool size - */ - protected static int getPoolSize(Configuration config) { - return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); - } - - /** Return the socket factory of this client - * - * @return this client's socket factory - */ - SocketFactory getSocketFactory() { - return socketFactory; - } - - /** Stop all threads related to this client. No further calls may be made - * using this client. */ - public void stop() { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping client"); - } - - if (!running.compareAndSet(true, false)) { - return; - } - - // wake up all connections - synchronized (connections) { - for (Connection conn : connections.values()) { - conn.interrupt(); - } - } - - // wait until all connections are closed - while (!connections.isEmpty()) { - try { - Thread.sleep(100); - } catch (InterruptedException ignored) { - } - } - } - - /** Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * @param method - * @param param - * @param cells - * @param addr - * @param protocol - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection - * each time. - * @param rpcTimeout - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException - * @throws IOException - */ - public Pair call(Method method, Message param, CellScanner cells, - InetSocketAddress addr, Class protocol, User ticket, int rpcTimeout) - throws InterruptedException, IOException { - Call call = new Call(method, param, cells); - Connection connection = - getConnection(addr, protocol, ticket, rpcTimeout, call, this.codec, this.compressor); - connection.writeRequest(call); // send the parameter - boolean interrupted = false; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (call) { - while (!call.done) { - try { - call.wait(); // wait for the result - } catch (InterruptedException ignored) { - // save the fact that we were interrupted - interrupted = true; - } - } - - if (interrupted) { - // set the interrupt flag now that we are done waiting - Thread.currentThread().interrupt(); - } - - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); - } - return new Pair(call.response, call.cells); - } - } - - /** - * Take an IOException and the address we were trying to connect to - * and return an IOException with the input exception as the cause. - * The new exception provides the stack trace of the place where - * the exception is thrown and some extra diagnostics information. - * If the exception is ConnectException or SocketTimeoutException, - * return a new one of the same type; Otherwise return an IOException. - * - * @param addr target address - * @param exception the relevant exception - * @return an exception to throw - */ - protected IOException wrapException(InetSocketAddress addr, - IOException exception) { - if (exception instanceof ConnectException) { - //connection refused; include the host:port in the error - return (ConnectException)new ConnectException( - "Call to " + addr + " failed on connection exception: " + exception) - .initCause(exception); - } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr + - " failed on socket timeout exception: " + exception).initCause(exception); - } else { - return (IOException)new IOException("Call to " + addr + " failed on local exception: " + - exception).initCause(exception); - } - } - - /** - * Interrupt the connections to the given ip:port server. This should be called if the server - * is known as actually dead. This will not prevent current operation to be retried, and, - * depending on their own behavior, they may retry on the same server. This can be a feature, - * for example at startup. In any case, they're likely to get connection refused (if the - * process died) or no route to host: i.e. there next retries should be faster and with a - * safe exception. - */ - public void cancelConnections(String hostname, int port, IOException ioe) { - synchronized (connections) { - for (Connection connection : connections.values()) { - if (connection.isAlive() && - connection.getRemoteAddress().getPort() == port && - connection.getRemoteAddress().getHostName().equals(hostname)) { - if (connection.shouldCloseConnection.compareAndSet(false, true)) { - LOG.info("The server on " + hostname + ":" + port + - " is dead - closing the connection " + connection.remoteId); - connection.closeException = ioe; - connection.close(); - // We could do a connection.interrupt(), but it's safer not to do it, as the - // interrupted exception behavior is not defined nor enforced enough. - } - } - } - } - } - - /* Get a connection from the pool, or create a new one and add it to the - * pool. Connections to a given host/port are reused. */ - protected Connection getConnection(InetSocketAddress addr, Class protocol, - User ticket, int rpcTimeout, Call call, final Codec codec, final CompressionCodec compressor) - throws IOException, InterruptedException { - if (!running.get()) { - // the client is stopped - throw new IOException("The client is stopped"); - } - Connection connection; - /* we could avoid this allocation for each RPC by having a - * connectionsId object and with set() method. We need to manage the - * refs for keys in HashMap properly. For now its ok. - */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = createConnection(remoteId, this.codec, this.compressor); - connections.put(remoteId, connection); - } - } - connection.addCall(call); - - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. - connection.setupIOstreams(); - return connection; - } - - /** - * This class holds the address and the user ticket. The client connections - * to servers are uniquely identified by - */ - protected static class ConnectionId { - final InetSocketAddress address; - final User ticket; - final int rpcTimeout; - Class protocol; - private static final int PRIME = 16777619; - - ConnectionId(InetSocketAddress address, Class protocol, - User ticket, - int rpcTimeout) { - this.protocol = protocol; - this.address = address; - this.ticket = ticket; - this.rpcTimeout = rpcTimeout; - } - - InetSocketAddress getAddress() { - return address; - } - - Class getProtocol() { - return protocol; - } - - User getTicket() { - return ticket; - } - - @Override - public String toString() { - return this.address.toString() + "/" + this.protocol + "/" + this.ticket + "/" + - this.rpcTimeout; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ConnectionId) { - ConnectionId id = (ConnectionId) obj; - return address.equals(id.address) && protocol == id.protocol && - ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; - } - return false; - } - - @Override // simply use the default Object#hashcode() ? - public int hashCode() { - int hashcode = (address.hashCode() + PRIME * (PRIME * System.identityHashCode(protocol) ^ - (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; - return hashcode; - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java deleted file mode 100644 index 8d29a49..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.IpcProtocol; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; - -/** - * An RPC implementation. This class provides the client side. - */ -@InterfaceAudience.Private -public class HBaseClientRPC { - protected static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC"); - - // thread-specific RPC timeout, which may override that of RpcEngine - private static ThreadLocal rpcTimeout = new ThreadLocal() { - @Override - protected Integer initialValue() { - return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - } - }; - - /** - * @param protocol protocol interface - * @param addr address of remote service - * @param conf configuration - * @param maxAttempts max attempts - * @param rpcTimeout timeout for each RPC - * @param timeout timeout in milliseconds - * @return proxy - * @throws java.io.IOException e - */ - public static T waitForProxy(RpcClientEngine engine, - Class protocol, - InetSocketAddress addr, - Configuration conf, - int maxAttempts, - int rpcTimeout, - long timeout) - throws IOException { - // HBase does limited number of reconnects which is different from hadoop. - long startTime = System.currentTimeMillis(); - IOException ioe; - int reconnectAttempts = 0; - while (true) { - try { - return engine.getProxy(protocol, addr, conf, rpcTimeout); - } catch (SocketTimeoutException te) { - LOG.info("Problem connecting to server: " + addr); - ioe = te; - } catch (IOException ioex) { - // We only handle the ConnectException. - ConnectException ce = null; - if (ioex instanceof ConnectException) { - ce = (ConnectException) ioex; - ioe = ce; - } else if (ioex.getCause() != null - && ioex.getCause() instanceof ConnectException) { - ce = (ConnectException) ioex.getCause(); - ioe = ce; - } else if (ioex.getMessage().toLowerCase() - .contains("connection refused")) { - ce = new ConnectException(ioex.getMessage()); - ioe = ce; - } else { - // This is the exception we can't handle. - ioe = ioex; - } - if (ce != null) { - handleConnectionException(++reconnectAttempts, maxAttempts, protocol, - addr, ce); - } - } - // check if timed out - if (System.currentTimeMillis() - timeout >= startTime) { - throw ioe; - } - - // wait for retry - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new InterruptedIOException(); - } - } - } - - /** - * @param retries current retried times. - * @param maxAttmpts max attempts - * @param protocol protocol interface - * @param addr address of remote service - * @param ce ConnectException - * @throws org.apache.hadoop.hbase.client.RetriesExhaustedException - * - */ - private static void handleConnectionException(int retries, - int maxAttmpts, - Class protocol, - InetSocketAddress addr, - ConnectException ce) - throws RetriesExhaustedException { - if (maxAttmpts >= 0 && retries >= maxAttmpts) { - LOG.info("Server at " + addr + " could not be reached after " - + maxAttmpts + " tries, giving up."); - throw new RetriesExhaustedException("Failed setting up proxy " + protocol - + " to " + addr.toString() + " after attempts=" + maxAttmpts, ce); - } - } - - public static void setRpcTimeout(int t) { - rpcTimeout.set(t); - } - - public static int getRpcTimeout() { - return rpcTimeout.get(); - } - - public static void resetRpcTimeout() { - rpcTimeout.remove(); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 19c786d..3b43bfd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -33,6 +33,12 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -44,6 +50,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import com.google.protobuf.TextFormat; /** * Utility to help ipc'ing. @@ -263,4 +270,24 @@ class IPCUtil { Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); return totalSize; } -} + + /** + * Return short version of Param Message toString'd, shorter than TextFormat#regionServerStartup + * @param methodName + * @param request + * @return toString of passed param + */ + static String getRequestShortTextFormat(Message request) { + if (request instanceof ScanRequest) { + return TextFormat.shortDebugString(request); + } else if (request instanceof RegionServerReportRequest) { + // Print a short message only, just the servername and the requests, not the full load. + RegionServerReportRequest r = (RegionServerReportRequest)request; + return "server " + TextFormat.shortDebugString(r.getServer()) + + " load { numberOfRequests: " + r.getLoad().getNumberOfRequests() + " }"; + } else if (request instanceof RegionServerStartupRequest) { + return TextFormat.shortDebugString(request); + } + return "TODO " + TextFormat.shortDebugString(request); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java deleted file mode 100644 index f28b61b..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.IpcProtocol; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.ipc.RemoteException; - -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ProtobufRpcClientEngine implements RpcClientEngine { - - private static final Log LOG = - LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine"); - - public HBaseClient getClient() { - return client; - } - - protected HBaseClient client; - - public ProtobufRpcClientEngine(Configuration conf, String clusterId) { - this.client = new HBaseClient(conf, clusterId); - } - - - @Override - public T getProxy( - Class protocol, InetSocketAddress addr, - Configuration conf, int rpcTimeout) throws IOException { - final Invoker invoker = new Invoker(protocol, addr, User.getCurrent(), rpcTimeout, client); - return (T) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[]{protocol}, invoker); - } - - @Override - public void close() { - this.client.stop(); - } - - static class Invoker implements InvocationHandler { - private static final Map returnTypes = - new ConcurrentHashMap(); - private Class protocol; - private InetSocketAddress address; - private User ticket; - private HBaseClient client; - final private int rpcTimeout; - - public Invoker(Class protocol, InetSocketAddress addr, User ticket, - int rpcTimeout, HBaseClient client) - throws IOException { - this.protocol = protocol; - this.address = addr; - this.ticket = ticket; - this.client = client; - this.rpcTimeout = rpcTimeout; - } - - /** - * This is the client side invoker of RPC method. It only throws - * ServiceException, since the invocation proxy expects only - * ServiceException to be thrown by the method in case protobuf service. - * - * ServiceException has the following causes: - *
    - *
  1. Exceptions encountered on the client side in this method are - * set as cause in ServiceException as is.
  2. - *
  3. Exceptions from the server are wrapped in RemoteException and are - * set as cause in ServiceException
  4. - *
- * - *

Note that the client calling protobuf RPC methods, must handle - * ServiceException by getting the cause from the ServiceException. If the - * cause is RemoteException, then unwrap it to get the exception thrown by - * the server. - */ - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws ServiceException { - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = System.currentTimeMillis(); - } - if (args.length != 2) { - throw new ServiceException(method.getName() + " didn't get two args: " + args.length); - } - // Get the controller. Often null. Presume payload carrying controller. Payload is optional. - // It is cells/data that we do not want to protobuf. - PayloadCarryingRpcController controller = (PayloadCarryingRpcController)args[0]; - CellScanner cells = null; - if (controller != null) { - cells = controller.cellScanner(); - // Clear it here so we don't by mistake try and these cells processing results. - controller.setCellScanner(null); - } - // The request parameter - Message param = (Message)args[1]; - Pair val = null; - try { - val = client.call(method, param, cells, address, protocol, ticket, rpcTimeout); - if (controller != null) { - // Shove the results into controller so can be carried across the proxy/pb service void. - if (val.getSecond() != null) controller.setCellScanner(val.getSecond()); - } else if (val.getSecond() != null) { - throw new ServiceException("Client dropping data on the floor!"); - } - - if (LOG.isTraceEnabled()) { - long callTime = System.currentTimeMillis() - startTime; - if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime); - } - return val.getFirst(); - } catch (Throwable e) { - if (e instanceof RemoteException) { - Throwable cause = ((RemoteException)e).unwrapRemoteException(); - throw new ServiceException("methodName=" + method.getName(), cause); - } - throw new ServiceException(e); - } - } - - static Message getReturnProtoType(Method method) throws Exception { - if (returnTypes.containsKey(method.getName())) { - return returnTypes.get(method.getName()); - } - Class returnType = method.getReturnType(); - if (returnType.getName().equals("void")) return null; - Method newInstMethod = returnType.getMethod("getDefaultInstance"); - newInstMethod.setAccessible(true); - Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null); - returnTypes.put(method.getName(), protoType); - return protoType; - } - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java deleted file mode 100644 index 18564c0..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.lang.reflect.Method; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.hbase.IpcProtocol; - - -import com.google.protobuf.Message; - -/** - * Save on relection by keeping around method, method argument, and constructor instances - */ -class ReflectionCache { - private final Map methodArgCache = new ConcurrentHashMap(); - private final Map methodInstanceCache = new ConcurrentHashMap(); - - public ReflectionCache() { - super(); - } - - Method getMethod(Class protocol, String methodName) { - Method method = this.methodInstanceCache.get(methodName); - if (method != null) return method; - Method [] methods = protocol.getMethods(); - for (Method m : methods) { - if (m.getName().equals(methodName)) { - m.setAccessible(true); - this.methodInstanceCache.put(methodName, m); - return m; - } - } - return null; - } - - Message getMethodArgType(Method method) throws Exception { - Message protoType = this.methodArgCache.get(method.getName()); - if (protoType != null) return protoType; - Class[] args = method.getParameterTypes(); - Class arg; - if (args.length == 2) { - // RpcController + Message in the method args - // (generated code from RPC bits in .proto files have RpcController) - arg = args[1]; - } else if (args.length == 1) { - arg = args[0]; - } else { - //unexpected - return null; - } - //in the protobuf methods, args[1] is the only significant argument - Method newInstMethod = arg.getMethod("getDefaultInstance"); - newInstMethod.setAccessible(true); - protoType = (Message) newInstMethod.invoke(null, (Object[]) null); - this.methodArgCache.put(method.getName(), protoType); - return protoType; - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 7131a26..8297450 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -79,7 +79,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ new ServerCallable(connection, table, row) { public CoprocessorServiceResponse call() throws Exception { byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.execService(server, call, regionName); + return ProtobufUtil.execService(stub, call, regionName); } }; CoprocessorServiceResponse result = callable.withRetries(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java index d405590..79d1daa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java @@ -22,8 +22,9 @@ import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; import org.apache.hadoop.ipc.RemoteException; /** - * An {@link RemoteException} with some extra information. If source exception + * A {@link RemoteException} with some extra information. If source exception * was a {@link DoNotRetryIOException}, {@link #isDoNotRetry()} will return true. + *

A {@link RemoteException} hosts exceptions we got from the server. */ @SuppressWarnings("serial") @InterfaceAudience.Private diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java new file mode 100644 index 0000000..9a6ae1b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -0,0 +1,1614 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.net.SocketFactory; +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; +import org.apache.hadoop.hbase.security.KerberosInfo; +import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; + +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + + +/** + * Does RPC against a cluster. Manages connections per regionserver in the cluster. + *

See HBaseServer + */ +@InterfaceAudience.Private +public class RpcClient { + // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under + // o.a.h.hbase is set to DEBUG as default). + public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient"); + protected final PoolMap connections; + + protected int counter; // counter for call ids + protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs + final protected Configuration conf; + final protected int maxIdleTime; // connections will be culled if it was idle for + // maxIdleTime microsecs + final protected int maxRetries; //the max. no. of retries for socket connections + final protected long failureSleep; // Time to sleep before retry on failure. + protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + protected final boolean tcpKeepAlive; // if T then use keepalives + protected int pingInterval; // how often sends ping to the server in msecs + protected FailedServers failedServers; + private final Codec codec; + private final CompressionCodec compressor; + private final IPCUtil ipcUtil; + + protected final SocketFactory socketFactory; // how to create sockets + protected String clusterId; + + final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; + final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; + final static int DEFAULT_PING_INTERVAL = 60000; // 1 min + final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds + final static int PING_CALL_ID = -1; + + public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; + public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; + + // thread-specific RPC timeout, which may override that of what was passed in. + // TODO: Verify still being used. + private static ThreadLocal rpcTimeout = new ThreadLocal() { + @Override + protected Integer initialValue() { + return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + } + }; + + /** + * A class to manage a list of servers that failed recently. + */ + static class FailedServers { + private final LinkedList> failedServers = new + LinkedList>(); + private final int recheckServersTimeout; + + FailedServers(Configuration conf) { + this.recheckServersTimeout = conf.getInt( + FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT); + } + + /** + * Add an address to the list of the failed servers list. + */ + public synchronized void addToFailedServers(InetSocketAddress address) { + final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout; + failedServers.addFirst(new Pair(expiry, address.toString())); + } + + /** + * Check if the server should be considered as bad. Clean the old entries of the list. + * + * @return true if the server is in the failed servers list + */ + public synchronized boolean isFailedServer(final InetSocketAddress address) { + if (failedServers.isEmpty()) { + return false; + } + + final String lookup = address.toString(); + final long now = EnvironmentEdgeManager.currentTimeMillis(); + + // iterate, looking for the search entry and cleaning expired entries + Iterator> it = failedServers.iterator(); + while (it.hasNext()) { + Pair cur = it.next(); + if (cur.getFirst() < now) { + it.remove(); + } else { + if (lookup.equals(cur.getSecond())) { + return true; + } + } + } + + return false; + } + } + + @SuppressWarnings("serial") + public static class FailedServerException extends IOException { + public FailedServerException(String s) { + super(s); + } + } + + /** + * set the ping interval value in configuration + * + * @param conf Configuration + * @param pingInterval the ping interval + */ + // Any reason we couldn't just do tcp keepalive instead of this pingery? + // St.Ack 20130121 + public static void setPingInterval(Configuration conf, int pingInterval) { + conf.setInt(PING_INTERVAL_NAME, pingInterval); + } + + /** + * Get the ping interval from configuration; + * If not set in the configuration, return the default value. + * + * @param conf Configuration + * @return the ping interval + */ + static int getPingInterval(Configuration conf) { + return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); + } + + /** + * Set the socket timeout + * @param conf Configuration + * @param socketTimeout the socket timeout + */ + public static void setSocketTimeout(Configuration conf, int socketTimeout) { + conf.setInt(SOCKET_TIMEOUT, socketTimeout); + } + + /** + * @return the socket timeout + */ + static int getSocketTimeout(Configuration conf) { + return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); + } + + /** A call waiting for a value. */ + protected class Call { + final int id; // call id + final Message param; // rpc request method param object + /** + * Optionally has cells when making call. Optionally has cells set on response. Used + * passing cells to the rpc and receiving the response. + */ + CellScanner cells; + Message response; // value, null if error + // The return type. Used to create shell into which we deserialize the response if any. + Message responseDefaultType; + IOException error; // exception, null if value + boolean done; // true when call is done + long startTime; + final MethodDescriptor md; + + protected Call(final MethodDescriptor md, Message param, final CellScanner cells, + final Message responseDefaultType) { + this.param = param; + this.md = md; + this.cells = cells; + this.startTime = System.currentTimeMillis(); + this.responseDefaultType = responseDefaultType; + synchronized (RpcClient.this) { + this.id = counter++; + } + } + + @Override + public String toString() { + return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + + (this.param != null? TextFormat.shortDebugString(this.param): "") + "}"; + } + + /** Indicate when the call is complete and the + * value or error are available. Notifies by default. */ + protected synchronized void callComplete() { + this.done = true; + notify(); // notify caller + } + + /** Set the exception when there is an error. + * Notify the caller the call is done. + * + * @param error exception thrown by the call; either local or remote + */ + public void setException(IOException error) { + this.error = error; + callComplete(); + } + + /** + * Set the return value when there is no error. + * Notify the caller the call is done. + * + * @param response return value of the call. + * @param cells Can be null + */ + public void setResponse(Message response, final CellScanner cells) { + this.response = response; + this.cells = cells; + callComplete(); + } + + public long getStartTime() { + return this.startTime; + } + } + + protected final static Map> tokenHandlers = + new HashMap>(); + static { + tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(), + new AuthenticationTokenSelector()); + } + + /** + * Creates a connection. Can be overridden by a subclass for testing. + * @param remoteId - the ConnectionId to use for the connection creation. + */ + protected Connection createConnection(ConnectionId remoteId, final Codec codec, + final CompressionCodec compressor) + throws IOException { + return new Connection(remoteId, codec, compressor); + } + + /** Thread that reads responses and notifies callers. Each connection owns a + * socket connected to a remote address. Calls are multiplexed through this + * socket: responses may be delivered out of order. */ + protected class Connection extends Thread { + private ConnectionHeader header; // connection header + protected ConnectionId remoteId; + protected Socket socket = null; // connected socket + protected DataInputStream in; + protected DataOutputStream out; + private InetSocketAddress server; // server ip:port + private String serverPrincipal; // server's krb5 principal name + private AuthMethod authMethod; // authentication method + private boolean useSasl; + private Token token; + private HBaseSaslRpcClient saslRpcClient; + private int reloginMaxBackoff; // max pause before relogin on sasl failure + private final Codec codec; + private final CompressionCodec compressor; + + // currently active calls + protected final ConcurrentSkipListMap calls = + new ConcurrentSkipListMap(); + protected final AtomicLong lastActivity = + new AtomicLong(); // last I/O activity time + protected final AtomicBoolean shouldCloseConnection = + new AtomicBoolean(); // indicate if the connection is closed + protected IOException closeException; // close reason + + Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) + throws IOException { + if (remoteId.getAddress().isUnresolved()) { + throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); + } + this.server = remoteId.getAddress(); + this.codec = codec; + this.compressor = compressor; + + UserGroupInformation ticket = remoteId.getTicket().getUGI(); + Class securityInfo = remoteId.getSecurityInfo(); + this.useSasl = User.isHBaseSecurityEnabled(conf); + if (useSasl && securityInfo != null) { + TokenInfo tokenInfo = securityInfo.getAnnotation(TokenInfo.class); + if (tokenInfo != null) { + TokenSelector tokenSelector = + tokenHandlers.get(tokenInfo.value()); + if (tokenSelector != null) { + token = tokenSelector.selectToken(new Text(clusterId), + ticket.getTokens()); + } else if (LOG.isDebugEnabled()) { + LOG.debug("No token selector found for type "+tokenInfo.value()); + } + } + KerberosInfo krbInfo = securityInfo.getAnnotation(KerberosInfo.class); + if (krbInfo != null) { + String serverKey = krbInfo.serverPrincipal(); + if (serverKey == null) { + throw new IOException( + "Can't obtain server Kerberos config key from KerberosInfo"); + } + serverPrincipal = SecurityUtil.getServerPrincipal( + conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase()); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC Server Kerberos principal name for protocol=" + + securityInfo.getCanonicalName() + " is " + serverPrincipal); + } + } + } + + if (!useSasl) { + authMethod = AuthMethod.SIMPLE; + } else if (token != null) { + authMethod = AuthMethod.DIGEST; + } else { + authMethod = AuthMethod.KERBEROS; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName + + ", sasl=" + useSasl); + } + reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); + this.remoteId = remoteId; + + ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); + builder.setServiceName(remoteId.getServiceName()); + UserInformation userInfoPB; + if ((userInfoPB = getUserInfo(ticket)) != null) { + builder.setUserInfo(userInfoPB); + } + builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); + if (this.compressor != null) { + builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); + } + this.header = builder.build(); + + this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + + remoteId.getAddress().toString() + + ((ticket==null)?" from an unknown user": (" from " + + ticket.getUserName()))); + this.setDaemon(true); + } + + private UserInformation getUserInfo(UserGroupInformation ugi) { + if (ugi == null || authMethod == AuthMethod.DIGEST) { + // Don't send user for token auth + return null; + } + UserInformation.Builder userInfoPB = UserInformation.newBuilder(); + if (authMethod == AuthMethod.KERBEROS) { + // Send effective user for Kerberos auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + } else if (authMethod == AuthMethod.SIMPLE) { + //Send both effective user and real user for simple auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + if (ugi.getRealUser() != null) { + userInfoPB.setRealUser(ugi.getRealUser().getUserName()); + } + } + return userInfoPB.build(); + } + + /** Update lastActivity with the current time. */ + protected void touch() { + lastActivity.set(System.currentTimeMillis()); + } + + /** + * Add a call to this connection's call queue and notify + * a listener; synchronized. If the connection is dead, the call is not added, and the + * caller is notified. + * This function can return a connection that is already marked as 'shouldCloseConnection' + * It is up to the user code to check this status. + * @param call to add + */ + protected synchronized void addCall(Call call) { + // If the connection is about to close, we manage this as if the call was already added + // to the connection calls list. If not, the connection creations are serialized, as + // mentioned in HBASE-6364 + if (this.shouldCloseConnection.get()) { + if (this.closeException == null) { + call.setException(new IOException( + "Call " + call.id + " not added as the connection " + remoteId + " is closing")); + } else { + call.setException(this.closeException); + } + synchronized (call) { + call.notifyAll(); + } + } else { + calls.put(call.id, call); + synchronized (call) { + notify(); + } + } + } + + /** This class sends a ping to the remote side when timeout on + * reading. If no failure is detected, it retries until at least + * a byte is read. + */ + protected class PingInputStream extends FilterInputStream { + /* constructor */ + protected PingInputStream(InputStream in) { + super(in); + } + + /* Process timeout exception + * if the connection is not going to be closed, send a ping. + * otherwise, throw the timeout exception. + */ + private void handleTimeout(SocketTimeoutException e) throws IOException { + if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) { + throw e; + } + sendPing(); + } + + /** Read a byte from the stream. + * Send a ping if timeout on read. Retries if no failure is detected + * until a byte is read. + * @throws IOException for any IO problem other than socket timeout + */ + @Override + public int read() throws IOException { + do { + try { + return super.read(); + } catch (SocketTimeoutException e) { + handleTimeout(e); + } + } while (true); + } + + /** Read bytes into a buffer starting from offset off + * Send a ping if timeout on read. Retries if no failure is detected + * until a byte is read. + * + * @return the total number of bytes read; -1 if the connection is closed. + */ + @Override + public int read(byte[] buf, int off, int len) throws IOException { + do { + try { + return super.read(buf, off, len); + } catch (SocketTimeoutException e) { + handleTimeout(e); + } + } while (true); + } + } + + protected synchronized void setupConnection() throws IOException { + short ioFailures = 0; + short timeoutFailures = 0; + while (true) { + try { + this.socket = socketFactory.createSocket(); + this.socket.setTcpNoDelay(tcpNoDelay); + this.socket.setKeepAlive(tcpKeepAlive); + // connection time out is 20s + NetUtils.connect(this.socket, remoteId.getAddress(), + getSocketTimeout(conf)); + if (remoteId.rpcTimeout > 0) { + pingInterval = remoteId.rpcTimeout; // overwrite pingInterval + } + this.socket.setSoTimeout(pingInterval); + return; + } catch (SocketTimeoutException toe) { + /* The max number of retries is 45, + * which amounts to 20s*45 = 15 minutes retries. + */ + handleConnectionFailure(timeoutFailures++, maxRetries, toe); + } catch (IOException ie) { + handleConnectionFailure(ioFailures++, maxRetries, ie); + } + } + } + + protected void closeConnection() { + // close the current connection + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + LOG.warn("Not able to close a socket", e); + } + } + // set socket to null so that the next call to setupIOstreams + // can start the process of connect all over again. + socket = null; + } + + /** + * Handle connection failures + * + * If the current number of retries is equal to the max number of retries, + * stop retrying and throw the exception; Otherwise backoff N seconds and + * try connecting again. + * + * This Method is only called from inside setupIOstreams(), which is + * synchronized. Hence the sleep is synchronized; the locks will be retained. + * + * @param curRetries current number of retries + * @param maxRetries max number of retries allowed + * @param ioe failure reason + * @throws IOException if max number of retries is reached + */ + private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) + throws IOException { + + closeConnection(); + + // throw the exception if the maximum number of retries is reached + if (curRetries >= maxRetries) { + throw ioe; + } + + // otherwise back off and retry + try { + Thread.sleep(failureSleep); + } catch (InterruptedException ignored) {} + + LOG.info("Retrying connect to server: " + remoteId.getAddress() + + " after sleeping " + failureSleep + "ms. Already tried " + curRetries + + " time(s)."); + } + + /* wait till someone signals us to start reading RPC response or + * it is idle too long, it is marked as to be closed, + * or the client is marked as not running. + * + * Return true if it is time to read a response; false otherwise. + */ + protected synchronized boolean waitForWork() { + if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { + long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get()); + if (timeout>0) { + try { + wait(timeout); + } catch (InterruptedException ignored) {} + } + } + + if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { + return true; + } else if (shouldCloseConnection.get()) { + return false; + } else if (calls.isEmpty()) { // idle connection closed or stopped + markClosed(null); + return false; + } else { // get stopped but there are still pending requests + markClosed((IOException)new IOException().initCause( + new InterruptedException())); + return false; + } + } + + public InetSocketAddress getRemoteAddress() { + return remoteId.getAddress(); + } + + /* Send a ping to the server if the time elapsed + * since last I/O activity is equal to or greater than the ping interval + */ + protected synchronized void sendPing() throws IOException { + // Can we do tcp keepalive instead of this pinging? + long curTime = System.currentTimeMillis(); + if ( curTime - lastActivity.get() >= pingInterval) { + lastActivity.set(curTime); + //noinspection SynchronizeOnNonFinalField + synchronized (this.out) { + out.writeInt(PING_CALL_ID); + out.flush(); + } + } + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": starting, connections " + connections.size()); + } + + try { + while (waitForWork()) { // Wait here for work - read or close connection + readResponse(); + } + } catch (Throwable t) { + LOG.warn(getName() + ": unexpected exception receiving call responses", t); + markClosed(new IOException("Unexpected exception receiving call responses", t)); + } + + close(); + + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": stopped, connections " + connections.size()); + } + + private synchronized void disposeSasl() { + if (saslRpcClient != null) { + try { + saslRpcClient.dispose(); + saslRpcClient = null; + } catch (IOException ioe) { + LOG.error("Error disposing of SASL client", ioe); + } + } + } + + private synchronized boolean shouldAuthenticateOverKrb() throws IOException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation currentUser = + UserGroupInformation.getCurrentUser(); + UserGroupInformation realUser = currentUser.getRealUser(); + return authMethod == AuthMethod.KERBEROS && + loginUser != null && + //Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser)); + } + + private synchronized boolean setupSaslConnection(final InputStream in2, + final OutputStream out2) throws IOException { + saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal); + return saslRpcClient.saslConnect(in2, out2); + } + + /** + * If multiple clients with the same principal try to connect + * to the same server at the same time, the server assumes a + * replay attack is in progress. This is a feature of kerberos. + * In order to work around this, what is done is that the client + * backs off randomly and tries to initiate the connection + * again. + * The other problem is to do with ticket expiry. To handle that, + * a relogin is attempted. + *

+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} + * method. In case when the user doesn't have valid credentials, we don't + * need to retry (from cache or ticket). In such cases, it is prudent to + * throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from + * other high level (for eg, HCM or HBaseAdmin). + *

+ */ + private synchronized void handleSaslConnectionFailure( + final int currRetries, + final int maxRetries, final Exception ex, final Random rand, + final UserGroupInformation user) + throws IOException, InterruptedException{ + user.doAs(new PrivilegedExceptionAction() { + public Object run() throws IOException, InterruptedException { + closeConnection(); + if (shouldAuthenticateOverKrb()) { + if (currRetries < maxRetries) { + LOG.debug("Exception encountered while connecting to " + + "the server : " + ex); + //try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + disposeSasl(); + //have granularity of milliseconds + //we are sleeping with the Connection lock held but since this + //connection instance is being used for connecting to the server + //in question, it is okay + Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; + LOG.warn(msg); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException)ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + protected synchronized void setupIOstreams() + throws IOException, InterruptedException { + if (socket != null || shouldCloseConnection.get()) { + return; + } + + if (failedServers.isFailedServer(remoteId.getAddress())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not trying to connect to " + server + + " this server is in the failed servers list"); + } + IOException e = new FailedServerException( + "This server is in the failed servers list: " + server); + markClosed(e); + close(); + throw e; + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to " + server); + } + short numRetries = 0; + final short MAX_RETRIES = 5; + Random rand = null; + while (true) { + setupConnection(); + InputStream inStream = NetUtils.getInputStream(socket); + OutputStream outStream = NetUtils.getOutputStream(socket); + // Write out the preamble -- MAGIC, version, and auth to use. + writeConnectionHeaderPreamble(outStream); + if (useSasl) { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; + UserGroupInformation ticket = remoteId.getTicket().getUGI(); + if (authMethod == AuthMethod.KERBEROS) { + if (ticket != null && ticket.getRealUser() != null) { + ticket = ticket.getRealUser(); + } + } + boolean continueSasl = false; + if (ticket == null) throw new FatalConnectionException("ticket/user is null"); + try { + continueSasl = ticket.doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws IOException { + return setupSaslConnection(in2, out2); + } + }); + } catch (Exception ex) { + if (rand == null) { + rand = new Random(); + } + handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket); + continue; + } + if (continueSasl) { + // Sasl connect is successful. Let's set up Sasl i/o streams. + inStream = saslRpcClient.getInputStream(inStream); + outStream = saslRpcClient.getOutputStream(outStream); + } else { + // fall back to simple auth because server told us so. + authMethod = AuthMethod.SIMPLE; + useSasl = false; + } + } + this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + // Now write out the connection header + writeConnectionHeader(); + + // update last activity time + touch(); + + // start the receiver thread after the socket connection has been set up + start(); + return; + } + } catch (Throwable t) { + failedServers.addToFailedServers(remoteId.address); + IOException e = null; + if (t instanceof IOException) { + e = (IOException)t; + markClosed(e); + } else { + e = new IOException("Coundn't set up IO Streams", t); + markClosed(e); + } + close(); + throw e; + } + } + + /** + * Write the RPC header: + */ + private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException { + // Assemble the preamble up in a buffer first and then send it. Writing individual elements, + // they are getting sent across piecemeal according to wireshark and then server is messing + // up the reading on occasion (the passed in stream is not buffered yet). + + // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE + int rpcHeaderLen = HConstants.RPC_HEADER.array().length; + byte [] preamble = new byte [rpcHeaderLen + 2]; + System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen); + preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; + preamble[rpcHeaderLen + 1] = authMethod.code; + outStream.write(preamble); + outStream.flush(); + } + + /** + * Write the connection header. + * Out is not synchronized because only the first thread does this. + */ + private void writeConnectionHeader() throws IOException { + synchronized (this.out) { + this.out.writeInt(this.header.getSerializedSize()); + this.header.writeTo(this.out); + this.out.flush(); + } + } + + /** Close the connection. */ + protected synchronized void close() { + if (!shouldCloseConnection.get()) { + LOG.error(getName() + ": the connection is not in the closed state"); + return; + } + + // release the resources + // first thing to do;take the connection out of the connection list + synchronized (connections) { + if (connections.get(remoteId) == this) { + connections.remove(remoteId); + } + } + + // close the streams and therefore the socket + IOUtils.closeStream(out); + this.out = null; + IOUtils.closeStream(in); + this.in = null; + disposeSasl(); + + // clean up all calls + if (closeException == null) { + if (!calls.isEmpty()) { + LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " + + "#Calls: " + calls.size()); + + // clean up calls anyway + closeException = new IOException("Unexpected closed connection"); + cleanupCalls(); + } + } else { + // log the info + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": closing ipc connection to " + server + ": " + + closeException.getMessage(), closeException); + } + + // cleanup calls + cleanupCalls(); + } + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": closed"); + } + + /** + * Initiates a call by sending the parameter to the remote server. + * Note: this is not called from the Connection thread, but by other + * threads. + * @param call + * @see #readResponse() + */ + protected void writeRequest(Call call) { + if (shouldCloseConnection.get()) return; + try { + RequestHeader.Builder builder = RequestHeader.newBuilder(); + builder.setCallId(call.id); + if (Trace.isTracing()) { + Span s = Trace.currentTrace(); + builder.setTraceInfo(RPCTInfo.newBuilder(). + setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + } + builder.setMethodName(call.md.getName()); + builder.setRequestParam(call.param != null); + ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + if (cellBlock != null) { + CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); + cellBlockBuilder.setLength(cellBlock.limit()); + builder.setCellBlockMeta(cellBlockBuilder.build()); + } + //noinspection SynchronizeOnNonFinalField + RequestHeader header = builder.build(); + synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC + IPCUtil.write(this.out, header, call.param, cellBlock); + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); + } + } catch(IOException e) { + markClosed(e); + } + } + + /* Receive a response. + * Because only one receiver, so no synchronization on in. + */ + protected void readResponse() { + if (shouldCloseConnection.get()) return; + touch(); + int totalSize = -1; + try { + // See HBaseServer.Call.setResponse for where we write out the response. + // Total size of the response. Unused. But have to read it in anyways. + totalSize = in.readInt(); + + // Read the header + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": got response header " + + TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); + } + Call call = calls.get(id); + if (call == null) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; + LOG.debug("Unknown callId: " + id + ", skipping over this response of " + + whatIsLeftToRead + " bytes"); + IOUtils.skipFully(in, whatIsLeftToRead); + } + if (responseHeader.hasException()) { + ExceptionResponse exceptionResponse = responseHeader.getException(); + RemoteException re = createRemoteException(exceptionResponse); + if (isFatalConnectionException(exceptionResponse)) { + markClosed(re); + } else { + if (call != null) call.setException(re); + } + } else { + Message value = null; + // Call may be null because it may have timedout and been cleaned up on this side already + if (call != null && call.responseDefaultType != null) { + Builder builder = call.responseDefaultType.newBuilderForType(); + builder.mergeDelimitedFrom(in); + value = builder.build(); + } + CellScanner cellBlockScanner = null; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + byte [] cellBlock = new byte[size]; + IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); + cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); + } + // it's possible that this call may have been cleaned up due to a RPC + // timeout, so check if it still exists before setting the value. + if (call != null) call.setResponse(value, cellBlockScanner); + } + if (call != null) calls.remove(id); + } catch (IOException e) { + if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { + // Clean up open calls but don't treat this as a fatal condition, + // since we expect certain responses to not make it by the specified + // {@link ConnectionId#rpcTimeout}. + closeException = e; + } else { + // Treat this as a fatal condition and close this connection + markClosed(e); + } + } finally { + if (remoteId.rpcTimeout > 0) { + cleanupCalls(remoteId.rpcTimeout); + } + } + } + + /** + * @param e + * @return True if the exception is a fatal connection exception. + */ + private boolean isFatalConnectionException(final ExceptionResponse e) { + return e.getExceptionClassName(). + equals(FatalConnectionException.class.getName()); + } + + /** + * @param e + * @return RemoteException made from passed e + */ + private RemoteException createRemoteException(final ExceptionResponse e) { + String innerExceptionClassName = e.getExceptionClassName(); + boolean doNotRetry = e.getDoNotRetry(); + return e.hasHostname()? + // If a hostname then add it to the RemoteWithExtrasException + new RemoteWithExtrasException(innerExceptionClassName, + e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry): + new RemoteWithExtrasException(innerExceptionClassName, + e.getStackTrace(), doNotRetry); + } + + protected synchronized void markClosed(IOException e) { + if (shouldCloseConnection.compareAndSet(false, true)) { + closeException = e; + notifyAll(); + } + } + + /* Cleanup all calls and mark them as done */ + protected void cleanupCalls() { + cleanupCalls(0); + } + + protected void cleanupCalls(long rpcTimeout) { + Iterator> itor = calls.entrySet().iterator(); + while (itor.hasNext()) { + Call c = itor.next().getValue(); + long waitTime = System.currentTimeMillis() - c.getStartTime(); + if (waitTime >= rpcTimeout) { + if (this.closeException == null) { + // There may be no exception in the case that there are many calls + // being multiplexed over this connection and these are succeeding + // fine while this Call object is taking a long time to finish + // over on the server; e.g. I just asked the regionserver to bulk + // open 3k regions or its a big fat multiput into a heavily-loaded + // server (Perhaps this only happens at the extremes?) + this.closeException = new CallTimeoutException("Call id=" + c.id + + ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); + } + c.setException(this.closeException); + synchronized (c) { + c.notifyAll(); + } + itor.remove(); + } else { + break; + } + } + try { + if (!calls.isEmpty()) { + Call firstCall = calls.get(calls.firstKey()); + long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); + if (maxWaitTime < rpcTimeout) { + rpcTimeout -= maxWaitTime; + } + } + if (!shouldCloseConnection.get()) { + closeException = null; + if (socket != null) { + socket.setSoTimeout((int) rpcTimeout); + } + } + } catch (SocketException e) { + LOG.debug("Couldn't lower timeout, which may result in longer than expected calls"); + } + } + } + + /** + * Client-side call timeout + */ + @SuppressWarnings("serial") + public static class CallTimeoutException extends IOException { + public CallTimeoutException(final String msg) { + super(msg); + } + } + + /** + * Construct an IPC cluster client whose values are of the {@link Message} class. + * @param conf configuration + * @param factory socket factory + */ + RpcClient(Configuration conf, String clusterId, SocketFactory factory) { + this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s + this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); + this.failureSleep = conf.getInt("hbase.client.pause", 1000); + this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); + this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); + this.pingInterval = getPingInterval(conf); + this.ipcUtil = new IPCUtil(conf); + this.conf = conf; + this.codec = getCodec(conf); + this.compressor = getCompressor(conf); + this.socketFactory = factory; + this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; + this.connections = new PoolMap(getPoolType(conf), getPoolSize(conf)); + this.failedServers = new FailedServers(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + + ", tcpKeepAlive=" + this.tcpKeepAlive + + ", tcpNoDelay=" + this.tcpNoDelay + + ", maxIdleTime=" + this.maxIdleTime + + ", maxRetries=" + this.maxRetries + + ", ping interval=" + this.pingInterval + "ms."); + } + } + + /** + * Construct an IPC client for the cluster clusterId with the default SocketFactory + * @param conf configuration + * @param clusterId + */ + public RpcClient(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf)); + } + + /** + * Encapsulate the ugly casting and RuntimeException conversion in private method. + * @param conf + * @return Codec to use on this client. + */ + private static Codec getCodec(final Configuration conf) { + String className = conf.get("hbase.client.rpc.codec", KeyValueCodec.class.getCanonicalName()); + try { + return (Codec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed getting codec " + className, e); + } + } + + /** + * Encapsulate the ugly casting and RuntimeException conversion in private method. + * @param conf + * @return The compressor to use on this client. + */ + private static CompressionCodec getCompressor(final Configuration conf) { + String className = conf.get("hbase.client.rpc.compressor", null); + if (className == null || className.isEmpty()) return null; + try { + return (CompressionCodec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed getting compressor " + className, e); + } + } + + /** + * Return the pool type specified in the configuration, which must be set to + * either {@link PoolType#RoundRobin} or {@link PoolType#ThreadLocal}, + * otherwise default to the former. + * + * For applications with many user threads, use a small round-robin pool. For + * applications with few user threads, you may want to try using a + * thread-local pool. In any case, the number of {@link RpcClient} instances + * should not exceed the operating system's hard limit on the number of + * connections. + * + * @param config configuration + * @return either a {@link PoolType#RoundRobin} or + * {@link PoolType#ThreadLocal} + */ + protected static PoolType getPoolType(Configuration config) { + return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolType.RoundRobin, PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, which is applicable only if + * the pool type is {@link PoolType#RoundRobin}. + * + * @param config + * @return the maximum pool size + */ + protected static int getPoolSize(Configuration config) { + return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); + } + + /** Return the socket factory of this client + * + * @return this client's socket factory + */ + SocketFactory getSocketFactory() { + return socketFactory; + } + + /** Stop all threads related to this client. No further calls may be made + * using this client. */ + public void stop() { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping rpc client"); + } + + if (!running.compareAndSet(true, false)) { + return; + } + + // wake up all connections + synchronized (connections) { + for (Connection conn : connections.values()) { + conn.interrupt(); + } + } + + // wait until all connections are closed + while (!connections.isEmpty()) { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } + } + + /** Make a call, passing param, to the IPC server running at + * address which is servicing the protocol protocol, + * with the ticket credentials, returning the value. + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + * @param method + * @param param + * @param cells + * @param addr + * @param protocol + * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. + * {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection + * each time. + * @param rpcTimeout + * @return A pair with the Message response and the Cell data (if any). + * @throws InterruptedException + * @throws IOException + */ + Pair call(MethodDescriptor md, Message param, CellScanner cells, + Message returnType, Class securityInfo, User ticket, InetSocketAddress addr, + int rpcTimeout) + throws InterruptedException, IOException { + Call call = new Call(md, param, cells, returnType); + Connection connection = + getConnection(securityInfo, ticket, call, addr, rpcTimeout, this.codec, this.compressor); + connection.writeRequest(call); // send the parameter + boolean interrupted = false; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (call) { + while (!call.done) { + try { + call.wait(); // wait for the result + } catch (InterruptedException ignored) { + // save the fact that we were interrupted + interrupted = true; + } + } + + if (interrupted) { + // set the interrupt flag now that we are done waiting + Thread.currentThread().interrupt(); + } + + if (call.error != null) { + if (call.error instanceof RemoteException) { + call.error.fillInStackTrace(); + throw call.error; + } + // local exception + throw wrapException(addr, call.error); + } + return new Pair(call.response, call.cells); + } + } + + /** + * Take an IOException and the address we were trying to connect to + * and return an IOException with the input exception as the cause. + * The new exception provides the stack trace of the place where + * the exception is thrown and some extra diagnostics information. + * If the exception is ConnectException or SocketTimeoutException, + * return a new one of the same type; Otherwise return an IOException. + * + * @param addr target address + * @param exception the relevant exception + * @return an exception to throw + */ + protected IOException wrapException(InetSocketAddress addr, + IOException exception) { + if (exception instanceof ConnectException) { + //connection refused; include the host:port in the error + return (ConnectException)new ConnectException( + "Call to " + addr + " failed on connection exception: " + exception).initCause(exception); + } else if (exception instanceof SocketTimeoutException) { + return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr + + " failed because " + exception).initCause(exception); + } else { + return (IOException)new IOException("Call to " + addr + " failed on local exception: " + + exception).initCause(exception); + } + } + + /** + * Interrupt the connections to the given ip:port server. This should be called if the server + * is known as actually dead. This will not prevent current operation to be retried, and, + * depending on their own behavior, they may retry on the same server. This can be a feature, + * for example at startup. In any case, they're likely to get connection refused (if the + * process died) or no route to host: i.e. there next retries should be faster and with a + * safe exception. + */ + public void cancelConnections(String hostname, int port, IOException ioe) { + synchronized (connections) { + for (Connection connection : connections.values()) { + if (connection.isAlive() && + connection.getRemoteAddress().getPort() == port && + connection.getRemoteAddress().getHostName().equals(hostname)) { + LOG.info("The server on " + hostname + ":" + port + + " is dead - stopping the connection " + connection.remoteId); + connection.closeConnection(); + // We could do a connection.interrupt(), but it's safer not to do it, as the + // interrupted exception behavior is not defined nor enforced enough. + } + } + } + } + + /* Get a connection from the pool, or create a new one and add it to the + * pool. Connections to a given host/port are reused. */ + protected Connection getConnection(Class securityInfo, + User ticket, Call call, InetSocketAddress addr, int rpcTimeout, final Codec codec, + final CompressionCodec compressor) + throws IOException, InterruptedException { + if (!running.get()) { + // the client is stopped + throw new IOException("The client is stopped"); + } + Connection connection; + ConnectionId remoteId = + new ConnectionId(securityInfo, ticket, call.md.getService().getName(), addr, rpcTimeout); + synchronized (connections) { + connection = connections.get(remoteId); + if (connection == null) { + connection = createConnection(remoteId, this.codec, this.compressor); + connections.put(remoteId, connection); + } + } + connection.addCall(call); + + //we don't invoke the method below inside "synchronized (connections)" + //block above. The reason for that is if the server happens to be slow, + //it will take longer to establish a connection and that will slow the + //entire system down. + //Moreover, if the connection is currently created, there will be many threads + // waiting here; as setupIOstreams is synchronized. If the connection fails with a + // timeout, they will all fail simultaneously. This is checked in setupIOstreams. + connection.setupIOstreams(); + return connection; + } + + /** + * This class holds the address and the user ticket, etc. The client connections + * to servers are uniquely identified by + */ + protected static class ConnectionId { + final InetSocketAddress address; + final User ticket; + final int rpcTimeout; + Class securityInfo; + private static final int PRIME = 16777619; + final String serviceName; + + ConnectionId(Class securityInfo, + User ticket, + String serviceName, + InetSocketAddress address, + int rpcTimeout) { + this.securityInfo = securityInfo; + this.address = address; + this.ticket = ticket; + this.rpcTimeout = rpcTimeout; + this.serviceName = serviceName; + } + + String getServiceName() { + return this.serviceName; + } + + InetSocketAddress getAddress() { + return address; + } + + Class getSecurityInfo() { + return this.securityInfo; + } + + User getTicket() { + return ticket; + } + + @Override + public String toString() { + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" + + this.rpcTimeout; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ConnectionId) { + ConnectionId id = (ConnectionId) obj; + return address.equals(id.address) && this.securityInfo == id.securityInfo && + ((ticket != null && ticket.equals(id.ticket)) || + (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout && + this.serviceName == id.serviceName; + } + return false; + } + + @Override // simply use the default Object#hashcode() ? + public int hashCode() { + int hashcode = (address.hashCode() + + PRIME * (PRIME * System.identityHashCode(this.securityInfo) ^ + (ticket == null ? 0 : ticket.hashCode()) )) ^ + rpcTimeout ^ + this.serviceName.hashCode(); + return hashcode; + } + } + + public static void setRpcTimeout(int t) { + rpcTimeout.set(t); + } + + public static int getRpcTimeout() { + return rpcTimeout.get(); + } + + public static void resetRpcTimeout() { + rpcTimeout.remove(); + } + + /** Make a blocking call. + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + * @param method + * @param controller + * @param param + * @param returnType + * @param isa + * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. + * {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection + * each time. + * @param rpcTimeout + * @param securityInfo + * @return A pair with the Message response and the Cell data (if any). + * @throws InterruptedException + * @throws IOException + */ + Message callBlockingMethod(MethodDescriptor md, RpcController controller, + Message param, Message returnType, final User ticket, final InetSocketAddress isa, + final int rpcTimeout, final Class securityInfo) + throws ServiceException { + long startTime = 0; + if (LOG.isTraceEnabled()) { + startTime = System.currentTimeMillis(); + } + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; + CellScanner cells = null; + if (pcrc != null) { + cells = pcrc.cellScanner(); + // Clear it here so we don't by mistake try and these cells processing results. + pcrc.setCellScanner(null); + } + Pair val = null; + try { + val = call(md, param, cells, returnType, securityInfo, ticket, isa, rpcTimeout); + if (pcrc != null) { + // Shove the results into controller so can be carried across the proxy/pb service void. + if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); + } else if (val.getSecond() != null) { + throw new ServiceException("Client dropping data on the floor!"); + } + + if (LOG.isTraceEnabled()) { + long callTime = System.currentTimeMillis() - startTime; + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); + } + } + return val.getFirst(); + } catch (Throwable e) { + throw new ServiceException(e); + } + } + + /** + * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up + * protobuf blocking stubs. + * @param sn + * @param securityInfo + * @param ticket + * @param rpcTimeout + * @return A blocking rpc channel that goes via this rpc client instance. + */ + public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, + final Class securityInfo, final User ticket, final int rpcTimeout) { + return new BlockingRpcChannelImplementation(this, sn, securityInfo, ticket, rpcTimeout); + } + + /** + * Blocking rpc channel that goes via hbase rpc. + */ + // Public so can be subclassed for tests. + public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { + private final InetSocketAddress isa; + private volatile RpcClient rpcClient; + private final int rpcTimeout; + private final User ticket; + private final Class securityInfo; + + protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn, + final Class securityInfo, final User ticket, final int rpcTimeout) { + this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); + this.rpcClient = rpcClient; + this.rpcTimeout = rpcTimeout; + this.ticket = ticket; + this.securityInfo = securityInfo; + } + + @Override + public Message callBlockingMethod(MethodDescriptor md, RpcController controller, + Message param, Message returnType) + throws ServiceException { + return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket, + this.isa, this.rpcTimeout, this.securityInfo); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java deleted file mode 100644 index 45d58cb..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.IpcProtocol; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** An RPC implementation for the client */ -@InterfaceAudience.Private -public interface RpcClientEngine { - /** Construct a client-side proxy object. */ - T getProxy(Class protocol, InetSocketAddress addr, - Configuration conf, int rpcTimeout) throws IOException; - - /** Shutdown this instance */ - void close(); - - public HBaseClient getClient(); -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCellCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCellCodecException.java new file mode 100644 index 0000000..476514f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCellCodecException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public class UnsupportedCellCodecException extends FatalConnectionException { + public UnsupportedCellCodecException() { + super(); + } + + public UnsupportedCellCodecException(String msg) { + super(msg); + } + + public UnsupportedCellCodecException(String msg, Throwable t) { + super(msg, t); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCompressionCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCompressionCodecException.java new file mode 100644 index 0000000..bee5e7d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UnsupportedCompressionCodecException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public class UnsupportedCompressionCodecException extends FatalConnectionException { + public UnsupportedCompressionCodecException() { + super(); + } + + public UnsupportedCompressionCodecException(String msg) { + super(msg); + } + + public UnsupportedCompressionCodecException(String msg, Throwable t) { + super(msg, t); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/WrongVersionException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/WrongVersionException.java new file mode 100644 index 0000000..a1b92f5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/WrongVersionException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public class WrongVersionException extends FatalConnectionException { + public WrongVersionException() { + super(); + } + + public WrongVersionException(String msg) { + super(msg); + } + + public WrongVersionException(String msg, Throwable t) { + super(msg, t); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 3b826b0..5746cfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -46,11 +46,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.MasterAdminProtocol; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -67,6 +64,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -85,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; @@ -105,6 +104,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.security.access.Permission; @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.token.Token; import com.google.common.collect.ArrayListMultimap; @@ -222,6 +223,9 @@ public final class ProtobufUtil { if (e == null) { return new IOException(se); } + if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } return e instanceof IOException ? (IOException) e : new IOException(se); } @@ -1206,7 +1210,7 @@ public final class ProtobufUtil { * @return the result of the Get * @throws IOException */ - public static Result get(final ClientProtocol client, + public static Result get(final ClientService.BlockingInterface client, final byte[] regionName, final Get get) throws IOException { GetRequest request = RequestConverter.buildGetRequest(regionName, get); @@ -1229,7 +1233,7 @@ public final class ProtobufUtil { * @return the row or the closestRowBefore if it doesn't exist * @throws IOException */ - public static Result getRowOrBefore(final ClientProtocol client, + public static Result getRowOrBefore(final ClientService.BlockingInterface client, final byte[] regionName, final byte[] row, final byte[] family) throws IOException { GetRequest request = @@ -1254,7 +1258,7 @@ public final class ProtobufUtil { * @return true if all are loaded * @throws IOException */ - public static boolean bulkLoadHFile(final ClientProtocol client, + public static boolean bulkLoadHFile(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum) throws IOException { BulkLoadHFileRequest request = @@ -1268,7 +1272,7 @@ public final class ProtobufUtil { } } - public static CoprocessorServiceResponse execService(final ClientProtocol client, + public static CoprocessorServiceResponse execService(final ClientService.BlockingInterface client, final CoprocessorServiceCall call, final byte[] regionName) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( @@ -1282,8 +1286,9 @@ public final class ProtobufUtil { } } - public static CoprocessorServiceResponse execService(final MasterAdminProtocol client, - final CoprocessorServiceCall call) throws IOException { + public static CoprocessorServiceResponse execService( + final MasterAdminService.BlockingInterface client, final CoprocessorServiceCall call) + throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); @@ -1315,7 +1320,7 @@ public final class ProtobufUtil { * @return the retrieved region info * @throws IOException */ - public static HRegionInfo getRegionInfo(final AdminProtocol admin, + public static HRegionInfo getRegionInfo(final AdminService.BlockingInterface admin, final byte[] regionName) throws IOException { try { GetRegionInfoRequest request = @@ -1337,7 +1342,7 @@ public final class ProtobufUtil { * @param transitionInZK * @throws IOException */ - public static void closeRegion(final AdminProtocol admin, + public static void closeRegion(final AdminService.BlockingInterface admin, final byte[] regionName, final boolean transitionInZK) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(regionName, transitionInZK); @@ -1358,7 +1363,8 @@ public final class ProtobufUtil { * @return true if the region is closed * @throws IOException */ - public static boolean closeRegion(final AdminProtocol admin, final byte[] regionName, + public static boolean closeRegion(final AdminService.BlockingInterface admin, + final byte[] regionName, final int versionOfClosingNode, final ServerName destinationServer, final boolean transitionInZK) throws IOException { CloseRegionRequest closeRegionRequest = @@ -1379,7 +1385,7 @@ public final class ProtobufUtil { * @param region * @throws IOException */ - public static void openRegion(final AdminProtocol admin, + public static void openRegion(final AdminService.BlockingInterface admin, final HRegionInfo region) throws IOException { OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(region, -1); @@ -1398,7 +1404,8 @@ public final class ProtobufUtil { * @return a list of online region info * @throws IOException */ - public static List getOnlineRegions(final AdminProtocol admin) throws IOException { + public static List getOnlineRegions(final AdminService.BlockingInterface admin) + throws IOException { GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest(); GetOnlineRegionResponse response = null; try { @@ -1431,8 +1438,8 @@ public final class ProtobufUtil { * @return the server name * @throws IOException */ - public static ServerInfo getServerInfo( - final AdminProtocol admin) throws IOException { + public static ServerInfo getServerInfo(final AdminService.BlockingInterface admin) + throws IOException { GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest(); try { GetServerInfoResponse response = admin.getServerInfo(null, request); @@ -1452,8 +1459,9 @@ public final class ProtobufUtil { * @return the list of store files * @throws IOException */ - public static List getStoreFiles(final AdminProtocol admin, - final byte[] regionName, final byte[] family) throws IOException { + public static List getStoreFiles(final AdminService.BlockingInterface admin, + final byte[] regionName, final byte[] family) + throws IOException { GetStoreFileRequest request = RequestConverter.buildGetStoreFileRequest(regionName, family); try { @@ -1472,7 +1480,7 @@ public final class ProtobufUtil { * @param splitPoint * @throws IOException */ - public static void split(final AdminProtocol admin, + public static void split(final AdminService.BlockingInterface admin, final HRegionInfo hri, byte[] splitPoint) throws IOException { SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint); @@ -1493,7 +1501,7 @@ public final class ProtobufUtil { * two adjacent regions * @throws IOException */ - public static void mergeRegions(final AdminProtocol admin, + public static void mergeRegions(final AdminService.BlockingInterface admin, final HRegionInfo region_a, final HRegionInfo region_b, final boolean forcible) throws IOException { MergeRegionsRequest request = RequestConverter.buildMergeRegionsRequest( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index e4e16fd..2e9c0e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -120,6 +120,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R @Override public SortedMap> claimQueues(String regionserverZnode) { SortedMap> newQueues = new TreeMap>(); + if (ZKUtil.joinZNode(this.queuesZNode, regionserverZnode).equals(this.myQueuesZnode)) { + LOG.warn("An attempt was made to claim our own queues on region server " + regionserverZnode); + return newQueues; + } // check whether there is multi support. If yes, use it. if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue"); @@ -337,7 +341,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R try { position = parseHLogPositionFrom(positionBytes); } catch (DeserializationException e) { - LOG.warn("Failed parse of hlog position from the following znode: " + z); + LOG.warn("Failed parse of hlog position from the following znode: " + z + + ", Exception: " + e); } LOG.debug("Creating " + hlog + " with data " + position); String child = ZKUtil.joinZNode(newClusterZnode, hlog); @@ -382,6 +387,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * @throws DeserializationException */ private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException { + if(bytes == null) { + throw new DeserializationException("Unable to parse null HLog position."); + } if (ProtobufUtil.isPBMagicPrefix(bytes)) { int pblen = ProtobufUtil.lengthOfPBMagic(); ZooKeeperProtos.ReplicationHLogPosition.Builder builder = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 013bcf2..89da357 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -116,7 +116,8 @@ public class RecoverableZooKeeper { // the identifier = processID@hostName identifier = ManagementFactory.getRuntimeMXBean().getName(); } - LOG.info("The identifier of this process is " + identifier); + LOG.info("Process identifier=" + identifier + + " connecting to ZooKeeper ensemble=" + quorumServers); this.identifier = identifier; this.id = Bytes.toBytes(identifier); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index 6de3b69..871e667 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -56,8 +56,7 @@ public class ZKConfig { * @return Properties holding mappings representing ZooKeeper config file. */ public static Properties makeZKProps(Configuration conf) { - if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, - false)) { + if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) { LOG.warn( "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME + " file for ZK properties " + @@ -80,12 +79,9 @@ public class ZKConfig { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Skipped reading ZK properties file '" + - HConstants.ZOOKEEPER_CONFIG_NAME + - "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + - "' was not set to true"); + if (LOG.isTraceEnabled()) { + LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME + + "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true"); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 4cecb2f..d2f0d04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -116,8 +116,9 @@ public class ZKUtil { } int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - LOG.debug(identifier + " opening connection to ZooKeeper with ensemble (" + - ensemble + ")"); + if (LOG.isTraceEnabled()) { + LOG.debug(identifier + " opening connection to ZooKeeper ensemble=" + ensemble); + } int retry = conf.getInt("zookeeper.recovery.retry", 3); int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); @@ -419,9 +420,9 @@ public class ZKUtil { Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); boolean exists = s != null ? true : false; if (exists) { - LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); + LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode)); } else { - LOG.debug(zkw.prefix(znode+" does not exist. Watcher is set.")); + LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode)); } return exists; } catch (KeeperException e) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 2991355..07b6001 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -78,7 +78,7 @@ public class TestSnapshotFromAdmin { // mock the master admin to our mock MasterAdminKeepAliveConnection mockMaster = Mockito.mock(MasterAdminKeepAliveConnection.class); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); - Mockito.when(mockConnection.getKeepAliveMasterAdmin()).thenReturn(mockMaster); + Mockito.when(mockConnection.getKeepAliveMasterAdminService()).thenReturn(mockMaster); // set the max wait time for the snapshot to complete TakeSnapshotResponse response = TakeSnapshotResponse.newBuilder() .setExpectedTimeout(maxWaitTime) @@ -135,7 +135,7 @@ public class TestSnapshotFromAdmin { // mock the master connection MasterAdminKeepAliveConnection master = Mockito.mock(MasterAdminKeepAliveConnection.class); - Mockito.when(mockConnection.getKeepAliveMasterAdmin()).thenReturn(master); + Mockito.when(mockConnection.getKeepAliveMasterAdminService()).thenReturn(master); TakeSnapshotResponse response = TakeSnapshotResponse.newBuilder().setExpectedTimeout(0).build(); Mockito.when( master.snapshot((RpcController) Mockito.isNull(), Mockito.any(TakeSnapshotRequest.class))) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index faf12e4..f2762d0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -573,17 +573,6 @@ public final class HConstants { public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10; /** - * Parameter name for maximum attempts, used to limit the number of times the - * client will try to obtain the proxy for a given region server. - */ - public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts"; - - /** - * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}. - */ - public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1; - - /** * Parameter name for client prefetch limit, used as the maximum number of regions * info that will be prefetched. */ diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/ClassFinder.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/ClassFinder.java index 88cd169..84d7f37 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/ClassFinder.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/ClassFinder.java @@ -22,15 +22,14 @@ import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; import java.io.IOException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.jar.*; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/hbase-hadoop2-compat/pom.xml b/hbase-hadoop2-compat/pom.xml index 7b6e8a6..c1d1fa2 100644 --- a/hbase-hadoop2-compat/pom.xml +++ b/hbase-hadoop2-compat/pom.xml @@ -54,23 +54,6 @@ limitations under the License. true - - - - maven-assembly-plugin - ${maven.assembly.version} - - true - - - - - maven-assembly-plugin - ${maven.assembly.version} - - true - - maven-surefire-plugin