diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index be83489cb3..874b69bade 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -629,7 +629,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Number of threads to be allocated for metastore handler for fs operations."), METASTORE_HBASE_FILE_METADATA_THREADS("hive.metastore.hbase.file.metadata.threads", 1, "Number of threads to use to read file metadata in background to cache it."), - + METASTORE_URI_RESOLVER("hive.metastore.uri.resolver", "", + "If set, fully qualified class name of resolver for hive metastore uri's"), METASTORETHRIFTCONNECTIONRETRIES("hive.metastore.connect.retries", 3, "Number of retries while opening a connection to metastore"), METASTORETHRIFTFAILURERETRIES("hive.metastore.failure.retries", 1, diff --git contrib/pom.xml contrib/pom.xml index 7423e31cf8..e7fd76b421 100644 --- contrib/pom.xml +++ contrib/pom.xml @@ -75,6 +75,18 @@ ${junit.version} test + + + + org.apache.httpcomponents + httpclient + ${httpcomponents.client.version} + + + org.apache.httpcomponents + httpcore + ${httpcomponents.core.version} + diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Check.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Check.java new file mode 100644 index 0000000000..eb5a1afacf --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Check.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +import com.google.gson.annotations.SerializedName; +import java.util.List; + +/** + * Represents a consul check. + */ +public class Check { + + public static enum CheckStatus { + @SerializedName("unknown") + UNKNOWN, + @SerializedName("passing") + PASSING, + @SerializedName("warning") + WARNING, + @SerializedName("critical") + CRITICAL + } + + @SerializedName("Node") + private String node; + + @SerializedName("CheckID") + private String checkId; + + @SerializedName("Name") + private String name; + + @SerializedName("Status") + private CheckStatus status; + + @SerializedName("Notes") + private String notes; + + @SerializedName("Output") + private String output; + + @SerializedName("ServiceID") + private String serviceId; + + @SerializedName("ServiceName") + private String serviceName; + + @SerializedName("ServiceTags") + private List serviceTags; + + @SerializedName("CreateIndex") + private Long createIndex; + + @SerializedName("ModifyIndex") + private Long modifyIndex; + + public String getNode() { + return node; + } + + public void setNode(String node) { + this.node = node; + } + + public String getCheckId() { + return checkId; + } + + public void setCheckId(String checkId) { + this.checkId = checkId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public CheckStatus getStatus() { + return status; + } + + public void setStatus(CheckStatus status) { + this.status = status; + } + + public String getNotes() { + return notes; + } + + public void setNotes(String notes) { + this.notes = notes; + } + + public String getOutput() { + return output; + } + + public void setOutput(String output) { + this.output = output; + } + + public String getServiceId() { + return serviceId; + } + + public void setServiceId(String serviceId) { + this.serviceId = serviceId; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public List getServiceTags() { + return serviceTags; + } + + public void setServiceTags(List serviceTags) { + this.serviceTags = serviceTags; + } + + public Long getCreateIndex() { + return createIndex; + } + + public void setCreateIndex(Long createIndex) { + this.createIndex = createIndex; + } + + public Long getModifyIndex() { + return modifyIndex; + } + + public void setModifyIndex(Long modifyIndex) { + this.modifyIndex = modifyIndex; + } + + @Override + public String toString() { + return "Check{" + + "node='" + node + '\'' + + ", checkId='" + checkId + '\'' + + ", name='" + name + '\'' + + ", status=" + status + + ", notes='" + notes + '\'' + + ", output='" + output + '\'' + + ", serviceId='" + serviceId + '\'' + + ", serviceName='" + serviceName + '\'' + + ", serviceTags=" + serviceTags + + ", createIndex=" + createIndex + + ", modifyIndex=" + modifyIndex + + '}'; + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/ConsulURLHook.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/ConsulURLHook.java new file mode 100644 index 0000000000..3a63e58f4e --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/ConsulURLHook.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.hadoop.hive.metastore.hooks.URIResolverHook; +import org.apache.http.client.utils.URIBuilder; + +import javax.ws.rs.core.UriBuilder; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * Looks up from consul to return a list of current metastore uri's in form thrift://host:port + * + * Input uri is of the form consul://consul-host:consul-port/service-name + */ +public class ConsulURLHook implements URIResolverHook { + + static final protected Log LOG = LogFactory.getLog("hive.metastore"); + + private static final GsonBuilder GSON_BUILDER = new GsonBuilder(); + + private static final ThreadLocal GSON = new ThreadLocal() { + @Override + protected Gson initialValue() { + return GSON_BUILDER.create(); + } + }; + + public static Gson getGson() { + return GSON.get(); + } + + private String agentAddress; + private HttpTransport httpTransport = new HttpTransport(); + + @Override + public List resolveURI(URI uri) throws HiveMetaException { + String scheme = uri.getScheme(); + if (!scheme.equalsIgnoreCase("consul")) { + throw new IllegalArgumentException("Invalid scheme, please use consul://consul-host:consul-port/service-name"); + } + + LOG.info("Resolving consul uri : " + uri); + String consulHost = uri.getHost(); + String service = uri.getPath().substring(1); //strip leading slash + int consulPort = uri.getPort(); + + // check that agentHost has scheme or not + if (consulHost == "") { + throw new IllegalArgumentException("Unspecified consul host, please use consul://consul-host:consul-port/service-name"); + } + if (consulPort == -1) { + throw new IllegalArgumentException("Unspecified consul port, please use consul://consul-host:consul-port/service-name"); + } + if (service == "") { + throw new IllegalArgumentException("Unspecified consul service, please use consul://consul-host:consul-port/service-name"); + } + + String url = assembleUrl(consulHost, consulPort, service); + RawResponse rawResponse = httpTransport.makeGetRequest(url); + Response> healthyServices; + if (rawResponse.getStatusCode() == 200) { + List value = getGson().fromJson(rawResponse.getContent(), + new TypeToken>() { + }.getType()); + healthyServices = new Response>(value, rawResponse); + } else { + throw new HiveMetaException(rawResponse.getStatusMessage()); + } + + List thriftUris = new ArrayList(); + + LOG.info(String.format("Querying Consul (%s) for service %s", consulHost, service)); + for(Iterator i = healthyServices.getValue().iterator(); i.hasNext(); ) { + HealthService hn = i.next(); + HealthService.Node node = hn.getNode(); + URI thriftUri = UriBuilder.fromUri("") + .scheme("thrift") + .host(hn.getNode().getNode()) + .port(hn.getService().getPort()) + .build(); + thriftUris.add(thriftUri); + } + if(thriftUris.size() == 0) { + throw new IllegalArgumentException("There is no healthy nodes in Consul for the service: " + service); + } + Collections.shuffle(thriftUris); + return thriftUris; + } + + public static String assembleUrl(String host, int port, String service) throws HiveMetaException { + try { + URIBuilder builder = new URIBuilder("/v1/health/service/" + service); + builder.setHost(host) + .setPort(port) + .setParameter("passing", null) + .setScheme("http"); + return builder.build().toASCIIString(); + } catch (Exception e) { + throw new HiveMetaException("Can't encode url", e); + } + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HealthService.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HealthService.java new file mode 100644 index 0000000000..fba7be75fa --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HealthService.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; + +/** + * Represents a consul health check + */ +public class HealthService { + + public static class Node { + + @SerializedName("ID") + private String id; + + @SerializedName("Node") + private String node; + + @SerializedName("Address") + private String address; + + @SerializedName("Datacenter") + private String datacenter; + + @SerializedName("TaggedAddresses") + private Map taggedAddresses; + + @SerializedName("Meta") + private Map meta; + + @SerializedName("CreateIndex") + private Long createIndex; + + @SerializedName("ModifyIndex") + private Long modifyIndex; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getNode() { + return node; + } + + public void setNode(String node) { + this.node = node; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getDatacenter() { + return datacenter; + } + + public void setDatacenter(String datacenter) { + this.datacenter = datacenter; + } + + public Map getTaggedAddresses() { + return taggedAddresses; + } + + public void setTaggedAddresses(Map taggedAddresses) { + this.taggedAddresses = taggedAddresses; + } + + public Map getMeta() { + return meta; + } + + public void setMeta(Map meta) { + this.meta = meta; + } + + public Long getCreateIndex() { + return createIndex; + } + + public void setCreateIndex(Long createIndex) { + this.createIndex = createIndex; + } + + public Long getModifyIndex() { + return modifyIndex; + } + + public void setModifyIndex(Long modifyIndex) { + this.modifyIndex = modifyIndex; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", node='" + node + '\'' + + ", address='" + address + '\'' + + ", datacenter='" + datacenter + '\'' + + ", taggedAddresses=" + taggedAddresses + + ", meta=" + meta + + ", createIndex=" + createIndex + + ", modifyIndex=" + modifyIndex + + '}'; + } + } + + public static class Service { + @SerializedName("ID") + private String id; + + @SerializedName("Service") + private String service; + + @SerializedName("Tags") + private List tags; + + @SerializedName("Address") + private String address; + + @SerializedName("Port") + private Integer port; + + @SerializedName("EnableTagOverride") + private Boolean enableTagOverride; + + @SerializedName("CreateIndex") + private Long createIndex; + + @SerializedName("ModifyIndex") + private Long modifyIndex; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + public List getTags() { + return tags; + } + + public void setTags(List tags) { + this.tags = tags; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public Boolean getEnableTagOverride() { + return enableTagOverride; + } + + public void setEnableTagOverride(Boolean enableTagOverride) { + this.enableTagOverride = enableTagOverride; + } + + public Long getCreateIndex() { + return createIndex; + } + + public void setCreateIndex(Long createIndex) { + this.createIndex = createIndex; + } + + public Long getModifyIndex() { + return modifyIndex; + } + + public void setModifyIndex(Long modifyIndex) { + this.modifyIndex = modifyIndex; + } + + @Override + public String toString() { + return "Service{" + + "id='" + id + '\'' + + ", service='" + service + '\'' + + ", tags=" + tags + + ", address='" + address + '\'' + + ", port=" + port + + ", enableTagOverride=" + enableTagOverride + + ", createIndex=" + createIndex + + ", modifyIndex=" + modifyIndex + + '}'; + } + } + + @SerializedName("Node") + private Node node; + + @SerializedName("Service") + private Service service; + + @SerializedName("Checks") + private List checks; + + public Node getNode() { + return node; + } + + public void setNode(Node node) { + this.node = node; + } + + public Service getService() { + return service; + } + + public void setService(Service service) { + this.service = service; + } + + public List getChecks() { + return checks; + } + + public void setChecks(List checks) { + this.checks = checks; + } + + @Override + public String toString() { + return "HealthService{" + + "node=" + node + + ", service=" + service + + ", checks=" + checks + + '}'; + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HttpTransport.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HttpTransport.java new file mode 100644 index 0000000000..e6269553ed --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/HttpTransport.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.DefaultHttpClient; + +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * HTTP client. This class is thread safe. + */ +public class HttpTransport { + + static final int DEFAULT_MAX_CONNECTIONS = 1000; + static final int DEFAULT_MAX_PER_ROUTE_CONNECTIONS = 500; + static final int DEFAULT_CONNECTION_TIMEOUT = 10000; // 10 sec + + // 10 minutes for read timeout due to blocking queries timeout + // https://www.consul.io/api/index.html#blocking-queries + static final int DEFAULT_READ_TIMEOUT = 60000 * 10; // 10 min + + private final HttpClient httpClient; + + public HttpTransport() { + this.httpClient = new DefaultHttpClient(); + } + + public HttpTransport(HttpClient httpClient) { + this.httpClient = httpClient; + } + + + public RawResponse makeGetRequest(String url) throws HiveMetaException { + HttpGet httpGet = new HttpGet(url); + return executeRequest(httpGet); + } + + private RawResponse executeRequest(HttpUriRequest httpRequest) throws HiveMetaException { + try { + return httpClient.execute(httpRequest, new ResponseHandler() { + @Override + public RawResponse handleResponse(HttpResponse response) throws IOException { + int statusCode = response.getStatusLine().getStatusCode(); + String statusMessage = response.getStatusLine().getReasonPhrase(); + + String content = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8")); + + Long consulIndex = parseUnsignedLong(response.getFirstHeader("X-Consul-Index")); + Boolean consulKnownLeader = parseBoolean(response.getFirstHeader("X-Consul-Knownleader")); + Long consulLastContact = parseUnsignedLong(response.getFirstHeader("X-Consul-Lastcontact")); + + return new RawResponse(statusCode, statusMessage, content, consulIndex, consulKnownLeader, consulLastContact); + } + }); + } catch (IOException e) { + throw new HiveMetaException(e); + } + } + + private Long parseUnsignedLong(Header header) { + if (header == null) { + return null; + } + + String value = header.getValue(); + if (value == null) { + return null; + } + + try { + return parseUnsignedLong(value); + } catch (Exception e) { + return null; + } + } + + public static long parseUnsignedLong(String s) { + if (s.charAt(0) == '-') { + throw new NumberFormatException("An unsigned long was expected. Cannot parse negative number " + s); + } + int length = s.length(); + // Long.MAX_VALUE is 19 digits in length so anything + // shorter than that is trivial to parse. + if (length < 19) { + return Long.parseLong(s); + } + long front = Long.parseLong(s.substring(0, length - 1)); + int onesDigit = Character.digit(s.charAt(length - 1), 10); + if (onesDigit < 0) { + throw new NumberFormatException("Invalid last digit for " + onesDigit); + } + long result = front * 10 + onesDigit; + if (compareLong(result + Long.MIN_VALUE, front + Long.MIN_VALUE) < 0) { + throw new NumberFormatException("The number " + s + " is greater than 2^64"); + } + return result; + } + + private static int compareLong(long x, long y) { + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + + private Boolean parseBoolean(Header header) { + if (header == null) { + return null; + } + + if ("true".equals(header.getValue())) { + return true; + } + + if ("false".equals(header.getValue())) { + return false; + } + + return null; + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/RawResponse.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/RawResponse.java new file mode 100644 index 0000000000..ed0dda78ba --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/RawResponse.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +/** + * Represents a consul raw response + */ +public final class RawResponse { + + private final int statusCode; + private final String statusMessage; + + private final String content; + + private final Long consulIndex; + private final Boolean consulKnownLeader; + private final Long consulLastContact; + + public RawResponse(int statusCode, String statusMessage, String content, Long consulIndex, Boolean consulKnownLeader, Long consulLastContact) { + this.statusCode = statusCode; + this.statusMessage = statusMessage; + this.content = content; + this.consulIndex = consulIndex; + this.consulKnownLeader = consulKnownLeader; + this.consulLastContact = consulLastContact; + } + + public int getStatusCode() { + return statusCode; + } + + public String getStatusMessage() { + return statusMessage; + } + + public String getContent() { + return content; + } + + public Long getConsulIndex() { + return consulIndex; + } + + public Boolean isConsulKnownLeader() { + return consulKnownLeader; + } + + public Long getConsulLastContact() { + return consulLastContact; + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Response.java contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Response.java new file mode 100644 index 0000000000..644721ca6f --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/consul/Response.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.metastore.hooks.consul; + +/** + * Represents a consul response + */ +public final class Response { + + private final T value; + + private final Long consulIndex; + private final Boolean consulKnownLeader; + private final Long consulLastContact; + + public Response(T value, Long consulIndex, Boolean consulKnownLeader, Long consulLastContact) { + this.value = value; + this.consulIndex = consulIndex; + this.consulKnownLeader = consulKnownLeader; + this.consulLastContact = consulLastContact; + } + + public Response(T value, RawResponse rawResponse) { + this(value, rawResponse.getConsulIndex(), rawResponse.isConsulKnownLeader(), rawResponse.getConsulLastContact()); + } + + public T getValue() { + return value; + } + + public Long getConsulIndex() { + return consulIndex; + } + + public Boolean isConsulKnownLeader() { + return consulKnownLeader; + } + + public Long getConsulLastContact() { + return consulLastContact; + } + + @Override + public String toString() { + return "Response{" + + "value=" + value + + ", consulIndex=" + consulIndex + + ", consulKnownLeader=" + consulKnownLeader + + ", consulLastContact=" + consulLastContact + + '}'; + } +} diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 2b6b0b64c8..4354b196cd 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -57,14 +57,14 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.hooks.URIResolverHook; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.metastore.utils.SecurityUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.utils.ObjectPair; +import org.apache.hadoop.hive.metastore.utils.*; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; @@ -113,6 +113,7 @@ private String tokenStrForm; private final boolean localMetaStore; private final MetaStoreFilterHook filterHook; + private final URIResolverHook uriResolverHook; private final int fileMetadataBatchSize; private Map currentMetaVars; @@ -146,6 +147,7 @@ public HiveMetaStoreClient(Configuration conf, HiveMetaHookLoader hookLoader, Bo } version = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ? TEST_VERSION : VERSION; filterHook = loadFilterHooks(); + uriResolverHook = loadUriResolverHook(); fileMetadataBatchSize = MetastoreConf.getIntVar( conf, ConfVars.BATCH_RETRIEVE_OBJECTS_MAX); @@ -171,37 +173,7 @@ public HiveMetaStoreClient(Configuration conf, HiveMetaHookLoader hookLoader, Bo // user wants file store based configuration if (MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS) != null) { - String metastoreUrisString[] = MetastoreConf.getVar(conf, - ConfVars.THRIFT_URIS).split(","); - metastoreUris = new URI[metastoreUrisString.length]; - try { - int i = 0; - for (String s : metastoreUrisString) { - URI tmpUri = new URI(s); - if (tmpUri.getScheme() == null) { - throw new IllegalArgumentException("URI: " + s - + " does not have a scheme"); - } - metastoreUris[i++] = new URI( - tmpUri.getScheme(), - tmpUri.getUserInfo(), - HadoopThriftAuthBridge.getBridge().getCanonicalHostName(tmpUri.getHost()), - tmpUri.getPort(), - tmpUri.getPath(), - tmpUri.getQuery(), - tmpUri.getFragment() - ); - - } - // make metastore URIS random - List uriList = Arrays.asList(metastoreUris); - Collections.shuffle(uriList); - metastoreUris = (URI[]) uriList.toArray(); - } catch (IllegalArgumentException e) { - throw (e); - } catch (Exception e) { - MetaStoreUtils.logAndThrowMetaException(e); - } + resolveUris(); } else { LOG.error("NOT getting uris from conf"); throw new MetaException("MetaStoreURIs not found in conf file"); @@ -246,6 +218,50 @@ public Void run() throws Exception { open(); } + private void resolveUris() throws MetaException { + String metastoreUrisString[] = MetastoreConf.getVar(conf, + ConfVars.THRIFT_URIS).split(","); + + List metastoreURIArray = new ArrayList(); + try { + int i = 0; + for (String s : metastoreUrisString) { + URI tmpUri = new URI(s); + if (tmpUri.getScheme() == null) { + throw new IllegalArgumentException("URI: " + s + + " does not have a scheme"); + } + if (uriResolverHook != null) { + metastoreURIArray.addAll(uriResolverHook.resolveURI(tmpUri)); + } else { + metastoreURIArray.add(new URI( + tmpUri.getScheme(), + tmpUri.getUserInfo(), + HadoopThriftAuthBridge.getBridge().getCanonicalHostName(tmpUri.getHost()), + tmpUri.getPort(), + tmpUri.getPath(), + tmpUri.getQuery(), + tmpUri.getFragment() + )); + } + } + metastoreUris = new URI[metastoreURIArray.size()]; + for (int j = 0; j < metastoreURIArray.size(); j++) { + metastoreUris[j] = metastoreURIArray.get(j); + } + + // make metastore URIS random + List uriList = Arrays.asList(metastoreUris); + Collections.shuffle(uriList); + metastoreUris = (URI[]) uriList.toArray(); + } catch (IllegalArgumentException e) { + throw (e); + } catch (Exception e) { + MetaStoreUtils.logAndThrowMetaException(e); + } + } + + private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { Class authProviderClass = MetastoreConf. getClass(conf, ConfVars.FILTER_HOOK, DefaultMetaStoreFilterHookImpl.class, @@ -260,6 +276,26 @@ private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { } } + //multiple clients may initialize the hook at the same time + synchronized private URIResolverHook loadUriResolverHook() throws IllegalStateException { + + String uriResolverClassName = + MetastoreConf.getAsString(conf, ConfVars.URI_RESOLVER); + LOG.info("Loading uri resolver" + uriResolverClassName); + if (uriResolverClassName.equals("")) { + return null; + } else { + try { + Class uriResolverClass = Class.forName(uriResolverClassName, true, + JavaUtils.getClassLoader()); + return (URIResolverHook) ReflectionUtils.newInstance(uriResolverClass, null); + } catch (Exception e) { + LOG.error("Exception loading uri resolver hook" + e); + return null; + } + } + } + /** * Swaps the first element of the metastoreUris array with a random element from the * remainder of the array. @@ -321,6 +357,12 @@ public void reconnect() throws MetaException { " at the client level."); } else { close(); + + if (uriResolverHook != null) { + //for dynamic uris, re-lookup if there are new metastore locations + resolveUris(); + } + // Swap the first element of the metastoreUris[] with a random element from the rest // of the array. Rationale being that this method will generally be called when the default // connection has died and the default connection is likely to be the first array element. diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index b46cc38a22..3c1b308319 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -763,6 +763,8 @@ public static ConfVars getMetaConf(String name) { "class is used to store and retrieve transactions and locks"), TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS, "time after which transactions are declared aborted if the client has not sent a heartbeat."), + URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "", + "If set, fully qualified class name of resolver for hive metastore uri's"), USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, "Comma separated list of users who are in admin role for bootstrapping.\n" + "More users can be added in ADMIN role later."), diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/hooks/URIResolverHook.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/hooks/URIResolverHook.java new file mode 100644 index 0000000000..d3be5dd0a2 --- /dev/null +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/hooks/URIResolverHook.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.hooks; + +import org.apache.hadoop.hive.metastore.HiveMetaException; + +import java.net.URI; +import java.util.List; + +/** + * Allows different metastore uris to be resolved. + */ +public interface URIResolverHook { + + /** + * Resolve to a proper thrift uri, or a list of uris, given uri of another scheme. + * @param uri + * @return + */ + public List resolveURI(URI uri) throws HiveMetaException; +}