From 9287c869f749bff0cb93cf876b892cf950468a94 Mon Sep 17 00:00:00 2001 From: ShubhamChaurasia Date: Wed, 2 Sep 2020 15:18:41 +0530 Subject: [PATCH] HIVE-24059: Llap external client - Initial changes for running in cloud environment --- .../org/apache/hadoop/hive/conf/HiveConf.java | 26 ++ .../hive/llap/ext/TestLlapInputSplit.java | 9 +- ...apExtClientWithCloudDeploymentConfigs.java | 138 ++++++ .../hadoop/hive/llap/LlapInputSplit.java | 40 +- .../hadoop/hive/llap/ext/LlapDaemonInfo.java | 96 ++++ .../llap/registry/LlapServiceInstance.java | 21 + .../impl/InactiveServiceInstance.java | 10 + .../registry/impl/LlapFixedRegistryImpl.java | 18 +- .../impl/LlapZookeeperRegistryImpl.java | 53 ++- llap-common/pom.xml | 12 + .../daemon/rpc/LlapDaemonProtocolProtos.java | 416 +++++++++++++++--- .../org/apache/hadoop/hive/llap/LlapUtil.java | 21 +- .../DefaultJwtSharedSecretProvider.java | 99 +++++ .../hive/llap/security/JwtSecretProvider.java | 72 +++ .../llap/security/LlapExtClientJwtHelper.java | 70 +++ .../src/protobuf/LlapDaemonProtocol.proto | 2 + .../hadoop/hive/llap/LlapBaseInputFormat.java | 125 +----- .../cli/service/AsyncTaskCopyLocalJars.java | 5 +- .../llap/daemon/impl/ContainerRunnerImpl.java | 46 +- .../hive/llap/daemon/impl/LlapDaemon.java | 28 +- .../daemon/impl/LlapProtocolServerImpl.java | 25 +- .../hive/llap/daemon/MiniLlapCluster.java | 10 +- .../hive/llap/daemon/impl/TestLlapDaemon.java | 2 +- .../TestLlapDaemonProtocolServerImpl.java | 6 +- pom.xml | 16 + .../ql/udf/generic/GenericUDTFGetSplits.java | 89 +++- .../clientpositive/llap/get_splits_0.q.out | 4 +- 27 files changed, 1242 insertions(+), 217 deletions(-) create mode 100644 itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestLlapExtClientWithCloudDeploymentConfigs.java create mode 100644 llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapDaemonInfo.java create mode 100644 llap-common/src/java/org/apache/hadoop/hive/llap/security/DefaultJwtSharedSecretProvider.java create mode 100644 llap-common/src/java/org/apache/hadoop/hive/llap/security/JwtSecretProvider.java create mode 100644 llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapExtClientJwtHelper.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dec253f6cf..50194ffe31 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4897,6 +4897,32 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR("hive.llap.external.client.use.hybrid.calendar", false, "Whether to use hybrid calendar for parsing of data/timestamps."), + + // ====== confs for llap-external-client cloud deployment ====== + LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED( + "hive.llap.external.client.cloud.deployment.setup.enabled", false, + "Tells whether to enable additional RPC port, auth mechanism for llap external clients. This is meant" + + "for cloud based deployments. When true, it has following effects - \n" + + "1. Enables an extra RPC port on LLAP daemon to accept fragments from external clients. See" + + "hive.llap.external.client.cloud.rpc.port\n" + + "2. Uses external hostnames of LLAP in splits, so that clients can submit from outside of cloud. " + + "Env variable PUBLIC_HOSTNAME should be available on LLAP machines.\n" + + "3. Uses JWT based authentication for splits to be validated at LLAP. See " + + "hive.llap.external.client.cloud.jwt.shared.secret.provider"), + LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT("hive.llap.external.client.cloud.rpc.port", 30004, + "The LLAP daemon RPC port for external clients when llap is running in cloud environment."), + LLAP_EXTERNAL_CLIENT_CLOUD_OUTPUT_SERVICE_PORT("hive.llap.external.client.cloud.output.service.port", 30005, + "LLAP output service port when llap is running in cloud environment"), + LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_PROVIDER( + "hive.llap.external.client.cloud.jwt.shared.secret.provider", + "org.apache.hadoop.hive.llap.security.DefaultJwtSharedSecretProvider", + "Shared secret provider to be used to sign JWT"), + LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET("hive.llap.external.client.cloud.jwt.shared.secret", + "", + "The LLAP daemon RPC port for external clients when llap is running in cloud environment. " + + "Length of the secret should be >= 32 bytes"), + // ====== confs for llap-external-client cloud deployment ====== + LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java index df29dfc422..ebc9e0ff5d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java @@ -45,6 +45,11 @@ public void testWritable() throws Exception { new SplitLocationInfo("location2", false), }; + LlapDaemonInfo daemonInfo1 = new LlapDaemonInfo("host1", 30004, 15003); + LlapDaemonInfo daemonInfo2 = new LlapDaemonInfo("host2", 30004, 15003); + + LlapDaemonInfo[] llapDaemonInfos = {daemonInfo1, daemonInfo2}; + ArrayList colDescs = new ArrayList(); colDescs.add(new FieldDesc("col1", TypeInfoFactory.stringTypeInfo)); colDescs.add(new FieldDesc("col2", TypeInfoFactory.intTypeInfo)); @@ -52,7 +57,7 @@ public void testWritable() throws Exception { byte[] tokenBytes = new byte[] { 1 }; LlapInputSplit split1 = new LlapInputSplit(splitNum, planBytes, fragmentBytes, null, - locations, schema, "hive", tokenBytes); + locations, llapDaemonInfos, schema, "hive", tokenBytes, "some-dummy-jwt"); ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(byteOutStream); split1.write(dataOut); @@ -83,6 +88,8 @@ static void checkLlapSplits(LlapInputSplit split1, LlapInputSplit split2) throws assertArrayEquals(split1.getLocations(), split2.getLocations()); assertEquals(split1.getSchema().toString(), split2.getSchema().toString()); assertEquals(split1.getLlapUser(), split2.getLlapUser()); + assertEquals(split1.getJwt(), split2.getJwt()); + assertArrayEquals(split1.getLlapDaemonInfos(), split2.getLlapDaemonInfos()); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestLlapExtClientWithCloudDeploymentConfigs.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestLlapExtClientWithCloudDeploymentConfigs.java new file mode 100644 index 0000000000..cd6797b7e6 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestLlapExtClientWithCloudDeploymentConfigs.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.BeforeClass; +import org.junit.Ignore; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * TestLlapExtClientWithCloudDeploymentConfigs + */ +public class TestLlapExtClientWithCloudDeploymentConfigs extends BaseJdbcWithMiniLlap { + + @BeforeClass + public static void beforeTest() throws Exception { + System.setProperty("PUBLIC_HOSTNAME", InetAddress.getLocalHost().getHostAddress()); + + HiveConf conf = defaultConf(); + conf.set("minillap.usePortsFromConf", "true"); + + // enable setup for cloud based deployment + conf.setBoolVar(HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED, true); + conf.setVar(HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET, + "Three may keep a secret, if two of them are dead -- Benjamin Franklin"); + + conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + BaseJdbcWithMiniLlap.beforeTest(conf); + + } + + @Override protected InputFormat getInputFormat() { + //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + @Override public void testDataTypes() throws Exception { + TestJdbcWithMiniLlapVectorArrow testJdbcWithMiniLlapVectorArrow = new TestJdbcWithMiniLlapVectorArrow(); + testJdbcWithMiniLlapVectorArrow.testDataTypes(); + } + + @Override + @Ignore + public void testMultipleBatchesOfComplexTypes() { + // TODO: something else has broken parent test, need to check + } + + @Override protected int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) + throws Exception { + String url = miniHS2.getJdbcURL(); + String user = System.getProperty("user.name"); + String pwd = user; + String handleId = UUID.randomUUID().toString(); + + InputFormat inputFormat = getInputFormat(); + + // Get splits + JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, query); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); + job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "true"); + if (currentDatabase != null) { + job.set(LlapBaseInputFormat.DB_KEY, currentDatabase); + } + + InputSplit[] splits = inputFormat.getSplits(job, numSplits); + + if (splits.length <= 1) { + return 0; + } + + + // populate actual splits with schema and planBytes[] + LlapInputSplit schemaSplit = (LlapInputSplit) splits[0]; + LlapInputSplit planSplit = (LlapInputSplit) splits[1]; + + List actualSplits = new ArrayList<>(); + + for (int i = 2; i < splits.length; i++) { + LlapInputSplit actualSplit = (LlapInputSplit) splits[i]; + actualSplit.setSchema(schemaSplit.getSchema()); + actualSplit.setPlanBytes(planSplit.getPlanBytes()); + actualSplits.add(actualSplit); + } + + // Fetch rows from splits + int rowCount = 0; + for (InputSplit split : actualSplits) { + System.out.println("Processing split " + split.getLocations()); + RecordReader reader = inputFormat.getRecordReader(split, job, null); + Row row = reader.createValue(); + while (reader.next(NullWritable.get(), row)) { + rowProcessor.process(row); + ++rowCount; + } + //In arrow-mode this will throw exception unless all buffers have been released + //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader + reader.close(); + } + LlapBaseInputFormat.close(handleId); + + return rowCount; + } + +} diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 068a9133b5..27f7e162bb 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -20,11 +20,9 @@ import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo; import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.security.token.Token; public class LlapInputSplit implements InputSplitWithLocationInfo { @@ -32,25 +30,33 @@ private byte[] planBytes; private byte[] fragmentBytes; private SplitLocationInfo[] locations; + private LlapDaemonInfo[] llapDaemonInfos; private Schema schema; private String llapUser; private byte[] fragmentBytesSignature; private byte[] tokenBytes; + //only needed in cloud deployments for llap server to validate request from external llap clients. + //HS2 generates a JWT and populates this field while get_splits() call, this jwt gets validated at LLAP server + //when LlapInputSplit is submitted. + private String jwt; public LlapInputSplit() { } public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, - byte[] fragmentBytesSignature, SplitLocationInfo[] locations, Schema schema, - String llapUser, byte[] tokenBytes) { + byte[] fragmentBytesSignature, SplitLocationInfo[] locations, + LlapDaemonInfo[] llapDaemonInfos, Schema schema, + String llapUser, byte[] tokenBytes, String jwt) { this.planBytes = planBytes; this.fragmentBytes = fragmentBytes; this.fragmentBytesSignature = fragmentBytesSignature; this.locations = locations; + this.llapDaemonInfos = llapDaemonInfos; this.schema = schema; this.splitNum = splitNum; this.llapUser = llapUser; this.tokenBytes = tokenBytes; + this.jwt = jwt; } public Schema getSchema() { @@ -99,6 +105,10 @@ public void setSchema(Schema schema) { this.schema = schema; } + public String getJwt() { + return jwt; + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(splitNum); @@ -119,6 +129,11 @@ public void write(DataOutput out) throws IOException { out.writeUTF(locations[i].getLocation()); } + out.writeInt(llapDaemonInfos.length); + for (LlapDaemonInfo llapDaemonInfo : llapDaemonInfos) { + llapDaemonInfo.write(out); + } + schema.write(out); out.writeUTF(llapUser); if (tokenBytes != null) { @@ -127,6 +142,10 @@ public void write(DataOutput out) throws IOException { } else { out.writeInt(0); } + + if (jwt != null) { + out.writeUTF(jwt); + } } @Override @@ -152,6 +171,12 @@ public void readFields(DataInput in) throws IOException { locations[i] = new SplitLocationInfo(in.readUTF(), false); } + llapDaemonInfos = new LlapDaemonInfo[in.readInt()]; + for (int i = 0; i < llapDaemonInfos.length; i++) { + llapDaemonInfos[i] = new LlapDaemonInfo(); + llapDaemonInfos[i].readFields(in); + } + schema = new Schema(); schema.readFields(in); llapUser = in.readUTF(); @@ -160,6 +185,7 @@ public void readFields(DataInput in) throws IOException { tokenBytes = new byte[length]; in.readFully(tokenBytes); } + jwt = in.readUTF(); } @Override @@ -170,4 +196,8 @@ public void readFields(DataInput in) throws IOException { public String getLlapUser() { return llapUser; } + + public LlapDaemonInfo[] getLlapDaemonInfos() { + return llapDaemonInfos; + } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapDaemonInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapDaemonInfo.java new file mode 100644 index 0000000000..525a0d68e0 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapDaemonInfo.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.ext; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + +import org.apache.hadoop.io.Writable; + +/** + * LlapDaemonInfo - contains llap daemon information + * - host - hostname of llap daemon + * - rpcPort - rpc port of llap daemon to submit fragments + * - outputFormatPort - output port of llap daemon to read data corresponding to the submitted fragment + */ +public class LlapDaemonInfo implements Writable { + + private String host; + private int rpcPort; + private int outputFormatPort; + + public LlapDaemonInfo(String host, int rpcPort, int outputFormatPort) { + this.host = host; + this.rpcPort = rpcPort; + this.outputFormatPort = outputFormatPort; + } + + public LlapDaemonInfo() { + } + + public String getHost() { + return host; + } + + public int getRpcPort() { + return rpcPort; + } + + public int getOutputFormatPort() { + return outputFormatPort; + } + + @Override + public String toString() { + return "LlapDaemonInfo{" + + "host='" + host + '\'' + + ", rpcPort=" + rpcPort + + ", outputFormatPort=" + outputFormatPort + + '}'; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + LlapDaemonInfo that = (LlapDaemonInfo) o; + return rpcPort == that.rpcPort && outputFormatPort == that.outputFormatPort && Objects.equals(host, that.host); + } + + @Override public int hashCode() { + return Objects.hash(host, rpcPort, outputFormatPort); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(host); + out.writeInt(rpcPort); + out.writeInt(outputFormatPort); + } + + @Override + public void readFields(DataInput in) throws IOException { + host = in.readUTF(); + rpcPort = in.readInt(); + outputFormatPort = in.readInt(); + } +} diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java index 942f1d0761..71e0a7f6a3 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java @@ -14,6 +14,9 @@ package org.apache.hadoop.hive.llap.registry; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.yarn.api.records.Resource; @@ -47,6 +50,24 @@ */ public int getOutputFormatPort(); + /** + * External host, usually needed in cloud envs where we cannot access internal host from outside + * + * @return + */ + String getExternalHostname(); + + /** + * RPC endpoint for external clients - tcp traffic on this port should be opened on cloud. + * + * @return + */ + int getExternalClientsRpcPort(); + + + default void ensureCloudEnv(Configuration conf) { + Preconditions.checkState(LlapUtil.isCloudDeployment(conf), "Only supported in cloud based deployments"); + } /** * Memory and Executors available for the LLAP tasks diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java index d9c2364ee2..0febcb2b31 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -51,6 +51,16 @@ public int getShufflePort() { throw new UnsupportedOperationException(); } + @Override + public String getExternalHostname() { + throw new UnsupportedOperationException(); + } + + @Override + public int getExternalClientsRpcPort() { + throw new UnsupportedOperationException(); + } + @Override public String getServicesAddress() { throw new UnsupportedOperationException(); diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 2bedb325fc..b78c15ec34 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; @@ -59,7 +60,9 @@ private final int shuffle; private final int mngPort; private final int webPort; + private Configuration conf; private final int outputFormatPort; + private final int externalClientsRpcPort; private final String webScheme; private final String[] hosts; private final int memory; @@ -75,9 +78,10 @@ public LlapFixedRegistryImpl(String hosts, Configuration conf) { this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); this.outputFormatPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); - + this.externalClientsRpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT); this.webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + this.conf = conf; boolean isSsl = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); this.webScheme = isSsl ? "https" : "http"; @@ -190,6 +194,18 @@ public int getOutputFormatPort() { return LlapFixedRegistryImpl.this.outputFormatPort; } + @Override + public String getExternalHostname() { + ensureCloudEnv(LlapFixedRegistryImpl.this.conf); + return LlapUtil.getPublicHostname(); + } + + @Override + public int getExternalClientsRpcPort() { + ensureCloudEnv(LlapFixedRegistryImpl.this.conf); + return LlapFixedRegistryImpl.this.externalClientsRpcPort; + } + @Override public String getServicesAddress() { return serviceAddress; diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index cf4e7b8640..25cb89a375 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; -import org.apache.hadoop.registry.client.binding.RegistryUtils; import com.google.common.collect.Sets; import java.io.IOException; @@ -42,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; @@ -72,6 +72,7 @@ private static final String IPC_SHUFFLE = "shuffle"; private static final String IPC_LLAP = "llap"; private static final String IPC_OUTPUTFORMAT = "llapoutputformat"; + private static final String IPC_EXTERNAL_LLAP = "externalllap"; private final static String NAMESPACE_PREFIX = "llap-"; private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; @@ -135,6 +136,12 @@ public Endpoint getOutputFormatEndpoint() { HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT))); } + private Endpoint getExternalRpcEndpoint() { + int port = HiveConf.getIntVar(conf, ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT); + String host = LlapUtil.getPublicHostname(); + return RegistryTypeUtils.ipcEndpoint(IPC_EXTERNAL_LLAP, new InetSocketAddress(host, port)); + } + @Override public String register() throws IOException { daemonZkRecord = new ServiceRecord(); @@ -144,6 +151,11 @@ public String register() throws IOException { daemonZkRecord.addInternalEndpoint(getShuffleEndpoint()); daemonZkRecord.addExternalEndpoint(getServicesEndpoint()); daemonZkRecord.addInternalEndpoint(getOutputFormatEndpoint()); + Endpoint externalRpcEndpoint = null; + if (LlapUtil.isCloudDeployment(conf)) { + externalRpcEndpoint = getExternalRpcEndpoint(); + daemonZkRecord.addExternalEndpoint(externalRpcEndpoint); + } populateConfigValues(this.conf); Map capacityValues = new HashMap<>(2); @@ -173,9 +185,16 @@ public String register() throws IOException { } registerServiceRecord(daemonZkRecord, uniqueId); - LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, " + - "shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, getShuffleEndpoint(), - getServicesEndpoint(), getMngEndpoint(), getRegistrationZnodePath()); + if (LlapUtil.isCloudDeployment(conf)) { + LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, external client rpc : {} " + + "shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, externalRpcEndpoint, + getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), getRegistrationZnodePath()); + } else { + LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, " + + "shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, getShuffleEndpoint(), + getServicesEndpoint(), getMngEndpoint(), getRegistrationZnodePath()); + } + return uniqueId; } @@ -211,6 +230,10 @@ public void unregister() throws IOException { private final int shufflePort; private final int outputFormatPort; private final String serviceAddress; + + private String externalHost; + private int externalClientsRpcPort; + private final Resource resource; public DynamicServiceInstance(ServiceRecord srv) throws IOException { @@ -232,6 +255,16 @@ public DynamicServiceInstance(ServiceRecord srv) throws IOException { AddressTypes.ADDRESS_PORT_FIELD)); this.serviceAddress = RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI); + + if (LlapUtil.isCloudDeployment(conf)) { + final Endpoint externalRpc = srv.getExternalEndpoint(IPC_EXTERNAL_LLAP); + this.externalHost = RegistryTypeUtils.getAddressField(externalRpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.externalClientsRpcPort = Integer.parseInt( + RegistryTypeUtils.getAddressField(externalRpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + String memStr = srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, ""); String coreStr = srv.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, ""); try { @@ -252,6 +285,18 @@ public String getServicesAddress() { return serviceAddress; } + @Override + public String getExternalHostname() { + ensureCloudEnv(LlapZookeeperRegistryImpl.this.conf); + return externalHost; + } + + @Override + public int getExternalClientsRpcPort() { + ensureCloudEnv(LlapZookeeperRegistryImpl.this.conf); + return externalClientsRpcPort; + } + @Override public Resource getResource() { return resource; diff --git a/llap-common/pom.xml b/llap-common/pom.xml index 4b69069626..a58a720db6 100644 --- a/llap-common/pom.xml +++ b/llap-common/pom.xml @@ -46,6 +46,18 @@ + + io.jsonwebtoken + jjwt-api + + + io.jsonwebtoken + jjwt-impl + + + io.jsonwebtoken + jjwt-jackson + org.apache.commons commons-lang3 diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 537a362c18..6db79d029c 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -9701,6 +9701,31 @@ public Builder clearKeyId() { * optional bool is_guaranteed = 12 [default = false]; */ boolean getIsGuaranteed(); + + // optional string jwt = 13; + /** + * optional string jwt = 13; + */ + boolean hasJwt(); + /** + * optional string jwt = 13; + */ + java.lang.String getJwt(); + /** + * optional string jwt = 13; + */ + com.google.protobuf.ByteString + getJwtBytes(); + + // optional bool is_external_client_request = 14 [default = false]; + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + boolean hasIsExternalClientRequest(); + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + boolean getIsExternalClientRequest(); } /** * Protobuf type {@code SubmitWorkRequestProto} @@ -9829,6 +9854,16 @@ private SubmitWorkRequestProto( isGuaranteed_ = input.readBool(); break; } + case 106: { + bitField0_ |= 0x00001000; + jwt_ = input.readBytes(); + break; + } + case 112: { + bitField0_ |= 0x00002000; + isExternalClientRequest_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -10155,6 +10190,65 @@ public boolean getIsGuaranteed() { return isGuaranteed_; } + // optional string jwt = 13; + public static final int JWT_FIELD_NUMBER = 13; + private java.lang.Object jwt_; + /** + * optional string jwt = 13; + */ + public boolean hasJwt() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * optional string jwt = 13; + */ + public java.lang.String getJwt() { + java.lang.Object ref = jwt_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + jwt_ = s; + } + return s; + } + } + /** + * optional string jwt = 13; + */ + public com.google.protobuf.ByteString + getJwtBytes() { + java.lang.Object ref = jwt_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + jwt_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional bool is_external_client_request = 14 [default = false]; + public static final int IS_EXTERNAL_CLIENT_REQUEST_FIELD_NUMBER = 14; + private boolean isExternalClientRequest_; + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public boolean hasIsExternalClientRequest() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public boolean getIsExternalClientRequest() { + return isExternalClientRequest_; + } + private void initFields() { workSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary.getDefaultInstance(); workSpecSignature_ = com.google.protobuf.ByteString.EMPTY; @@ -10168,6 +10262,8 @@ private void initFields() { initialEventBytes_ = com.google.protobuf.ByteString.EMPTY; initialEventSignature_ = com.google.protobuf.ByteString.EMPTY; isGuaranteed_ = false; + jwt_ = ""; + isExternalClientRequest_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10217,6 +10313,12 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000800) == 0x00000800)) { output.writeBool(12, isGuaranteed_); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + output.writeBytes(13, getJwtBytes()); + } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + output.writeBool(14, isExternalClientRequest_); + } getUnknownFields().writeTo(output); } @@ -10274,6 +10376,14 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBoolSize(12, isGuaranteed_); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(13, getJwtBytes()); + } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(14, isExternalClientRequest_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10357,6 +10467,16 @@ public boolean equals(final java.lang.Object obj) { result = result && (getIsGuaranteed() == other.getIsGuaranteed()); } + result = result && (hasJwt() == other.hasJwt()); + if (hasJwt()) { + result = result && getJwt() + .equals(other.getJwt()); + } + result = result && (hasIsExternalClientRequest() == other.hasIsExternalClientRequest()); + if (hasIsExternalClientRequest()) { + result = result && (getIsExternalClientRequest() + == other.getIsExternalClientRequest()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10418,6 +10538,14 @@ public int hashCode() { hash = (37 * hash) + IS_GUARANTEED_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getIsGuaranteed()); } + if (hasJwt()) { + hash = (37 * hash) + JWT_FIELD_NUMBER; + hash = (53 * hash) + getJwt().hashCode(); + } + if (hasIsExternalClientRequest()) { + hash = (37 * hash) + IS_EXTERNAL_CLIENT_REQUEST_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsExternalClientRequest()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -10561,6 +10689,10 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000400); isGuaranteed_ = false; bitField0_ = (bitField0_ & ~0x00000800); + jwt_ = ""; + bitField0_ = (bitField0_ & ~0x00001000); + isExternalClientRequest_ = false; + bitField0_ = (bitField0_ & ~0x00002000); return this; } @@ -10645,6 +10777,14 @@ public Builder clone() { to_bitField0_ |= 0x00000800; } result.isGuaranteed_ = isGuaranteed_; + if (((from_bitField0_ & 0x00001000) == 0x00001000)) { + to_bitField0_ |= 0x00001000; + } + result.jwt_ = jwt_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00002000; + } + result.isExternalClientRequest_ = isExternalClientRequest_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10701,6 +10841,14 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (other.hasIsGuaranteed()) { setIsGuaranteed(other.getIsGuaranteed()); } + if (other.hasJwt()) { + bitField0_ |= 0x00001000; + jwt_ = other.jwt_; + onChanged(); + } + if (other.hasIsExternalClientRequest()) { + setIsExternalClientRequest(other.getIsExternalClientRequest()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11454,6 +11602,113 @@ public Builder clearIsGuaranteed() { return this; } + // optional string jwt = 13; + private java.lang.Object jwt_ = ""; + /** + * optional string jwt = 13; + */ + public boolean hasJwt() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * optional string jwt = 13; + */ + public java.lang.String getJwt() { + java.lang.Object ref = jwt_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + jwt_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string jwt = 13; + */ + public com.google.protobuf.ByteString + getJwtBytes() { + java.lang.Object ref = jwt_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + jwt_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string jwt = 13; + */ + public Builder setJwt( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00001000; + jwt_ = value; + onChanged(); + return this; + } + /** + * optional string jwt = 13; + */ + public Builder clearJwt() { + bitField0_ = (bitField0_ & ~0x00001000); + jwt_ = getDefaultInstance().getJwt(); + onChanged(); + return this; + } + /** + * optional string jwt = 13; + */ + public Builder setJwtBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00001000; + jwt_ = value; + onChanged(); + return this; + } + + // optional bool is_external_client_request = 14 [default = false]; + private boolean isExternalClientRequest_ ; + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public boolean hasIsExternalClientRequest() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public boolean getIsExternalClientRequest() { + return isExternalClientRequest_; + } + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public Builder setIsExternalClientRequest(boolean value) { + bitField0_ |= 0x00002000; + isExternalClientRequest_ = value; + onChanged(); + return this; + } + /** + * optional bool is_external_client_request = 14 [default = false]; + */ + public Builder clearIsExternalClientRequest() { + bitField0_ = (bitField0_ & ~0x00002000); + isExternalClientRequest_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto) } @@ -22603,6 +22858,10 @@ public Builder mergeFrom( } /** * Protobuf type {@code EvictEntityRequestProto} + * + *
+   * Used for proactive eviction request. Must contain one DB name, and optionally table information.
+   * 
*/ public static final class EvictEntityRequestProto extends com.google.protobuf.GeneratedMessage @@ -22960,6 +23219,10 @@ protected Builder newBuilderForType( } /** * Protobuf type {@code EvictEntityRequestProto} + * + *
+     * Used for proactive eviction request. Must contain one DB name, and optionally table information.
+     * 
*/ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder @@ -23515,6 +23778,13 @@ public Builder removeTable(int index) { } /** * Protobuf type {@code TableProto} + * + *
+   * Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too.
+   * For the latter part_key contains only the keys, part_val has the values for all partitions on all keys:
+   * e.g.: for partitions pk0=p00/pk1=p01/pk2=p02 and pk0=p10/pk1=p11/pk2=p12
+   * part_key: [pk0, pk1, pk2], part_val: [p00, p01, p02, p10, p11, p12]
+   * 
*/ public static final class TableProto extends com.google.protobuf.GeneratedMessage @@ -23925,6 +24195,13 @@ protected Builder newBuilderForType( } /** * Protobuf type {@code TableProto} + * + *
+     * Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too.
+     * For the latter part_key contains only the keys, part_val has the values for all partitions on all keys:
+     * e.g.: for partitions pk0=p00/pk1=p01/pk2=p02 and pk0=p10/pk1=p11/pk2=p12
+     * part_key: [pk0, pk1, pk2], part_val: [p00, p01, p02, p10, p11, p12]
+     * 
*/ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder @@ -26113,7 +26390,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { " \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013NotT" + "ezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(\014" + "\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_name" + - "\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\366\002\n\026SubmitWorkReq" + + "\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\256\003\n\026SubmitWorkReq" + "uestProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOrB" + "inary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017fr" + "agment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004 " + @@ -26122,73 +26399,74 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "s_binary\030\010 \001(\014\0223\n\025fragment_runtime_info\030" + "\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_e" + "vent_bytes\030\n \001(\014\022\037\n\027initial_event_signat" + - "ure\030\013 \001(\014\022\034\n\ris_guaranteed\030\014 \001(\010:\005false\"" + - "t\n\027RegisterDagRequestProto\022\014\n\004user\030\001 \001(\t" + - "\022/\n\020query_identifier\030\002 \002(\0132\025.QueryIdenti" + - "fierProto\022\032\n\022credentials_binary\030\003 \001(\014\"\032\n" + - "\030RegisterDagResponseProto\"b\n\027SubmitWorkR" + - "esponseProto\022/\n\020submission_state\030\001 \001(\0162\025", - ".SubmissionStateProto\022\026\n\016unique_node_id\030" + - "\002 \001(\t\"\205\001\n\036SourceStateUpdatedRequestProto" + - "\022/\n\020query_identifier\030\001 \001(\0132\025.QueryIdenti" + - "fierProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001" + - "(\0162\021.SourceStateProto\"!\n\037SourceStateUpda" + - "tedResponseProto\"e\n\031QueryCompleteRequest" + - "Proto\022/\n\020query_identifier\030\001 \001(\0132\025.QueryI" + - "dentifierProto\022\027\n\014delete_delay\030\002 \001(\003:\0010\"" + - "\034\n\032QueryCompleteResponseProto\"t\n\035Termina" + - "teFragmentRequestProto\022/\n\020query_identifi", - "er\030\001 \001(\0132\025.QueryIdentifierProto\022\"\n\032fragm" + - "ent_identifier_string\030\002 \001(\t\" \n\036Terminate" + - "FragmentResponseProto\"\210\001\n\032UpdateFragment" + - "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + - ".QueryIdentifierProto\022\"\n\032fragment_identi" + - "fier_string\030\002 \001(\t\022\025\n\ris_guaranteed\030\003 \001(\010" + - "\"D\n\033UpdateFragmentResponseProto\022\016\n\006resul" + - "t\030\001 \001(\010\022\025\n\ris_guaranteed\030\002 \001(\010\"&\n\024GetTok" + - "enRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025GetTok" + - "enResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033LlapOu", - "tputSocketInitMessage\022\023\n\013fragment_id\030\001 \002" + - "(\t\022\r\n\005token\030\002 \001(\014\"\030\n\026PurgeCacheRequestPr" + - "oto\"6\n\027PurgeCacheResponseProto\022\033\n\023purged" + - "_memory_bytes\030\001 \001(\003\"&\n\010MapEntry\022\013\n\003key\030\001" + - " \001(\t\022\r\n\005value\030\002 \001(\003\"\036\n\034GetDaemonMetricsR" + - "equestProto\";\n\035GetDaemonMetricsResponseP" + - "roto\022\032\n\007metrics\030\001 \003(\0132\t.MapEntry\"A\n\027SetC" + - "apacityRequestProto\022\023\n\013executorNum\030\001 \001(\005" + - "\022\021\n\tqueueSize\030\002 \001(\005\"\032\n\030SetCapacityRespon" + - "seProto\"F\n\027EvictEntityRequestProto\022\017\n\007db", - "_name\030\001 \002(\t\022\032\n\005table\030\002 \003(\0132\013.TableProto\"" + - "D\n\nTableProto\022\022\n\ntable_name\030\001 \002(\t\022\020\n\010par" + - "t_key\030\002 \003(\t\022\020\n\010part_val\030\003 \003(\t\"1\n\030EvictEn" + - "tityResponseProto\022\025\n\revicted_bytes\030\001 \002(\003" + - "*2\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n" + - "\tS_RUNNING\020\002*E\n\024SubmissionStateProto\022\014\n\010" + - "ACCEPTED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHE" + - "R\020\0032\337\003\n\022LlapDaemonProtocol\022B\n\013registerDa" + - "g\022\030.RegisterDagRequestProto\032\031.RegisterDa" + - "gResponseProto\022?\n\nsubmitWork\022\027.SubmitWor", - "kRequestProto\032\030.SubmitWorkResponseProto\022" + - "W\n\022sourceStateUpdated\022\037.SourceStateUpdat" + - "edRequestProto\032 .SourceStateUpdatedRespo" + - "nseProto\022H\n\rqueryComplete\022\032.QueryComplet" + - "eRequestProto\032\033.QueryCompleteResponsePro" + - "to\022T\n\021terminateFragment\022\036.TerminateFragm" + - "entRequestProto\032\037.TerminateFragmentRespo" + - "nseProto\022K\n\016updateFragment\022\033.UpdateFragm" + - "entRequestProto\032\034.UpdateFragmentResponse" + - "Proto2\371\002\n\026LlapManagementProtocol\022C\n\022getD", - "elegationToken\022\025.GetTokenRequestProto\032\026." + - "GetTokenResponseProto\022?\n\npurgeCache\022\027.Pu" + - "rgeCacheRequestProto\032\030.PurgeCacheRespons" + - "eProto\022Q\n\020getDaemonMetrics\022\035.GetDaemonMe" + - "tricsRequestProto\032\036.GetDaemonMetricsResp" + - "onseProto\022B\n\013setCapacity\022\030.SetCapacityRe" + - "questProto\032\031.SetCapacityResponseProto\022B\n" + - "\013evictEntity\022\030.EvictEntityRequestProto\032\031" + - ".EvictEntityResponseProtoBH\n&org.apache." + - "hadoop.hive.llap.daemon.rpcB\030LlapDaemonP", - "rotocolProtos\210\001\001\240\001\001" + "ure\030\013 \001(\014\022\034\n\ris_guaranteed\030\014 \001(\010:\005false\022" + + "\013\n\003jwt\030\r \001(\t\022)\n\032is_external_client_reque" + + "st\030\016 \001(\010:\005false\"t\n\027RegisterDagRequestPro" + + "to\022\014\n\004user\030\001 \001(\t\022/\n\020query_identifier\030\002 \002" + + "(\0132\025.QueryIdentifierProto\022\032\n\022credentials" + + "_binary\030\003 \001(\014\"\032\n\030RegisterDagResponseProt", + "o\"b\n\027SubmitWorkResponseProto\022/\n\020submissi" + + "on_state\030\001 \001(\0162\025.SubmissionStateProto\022\026\n" + + "\016unique_node_id\030\002 \001(\t\"\205\001\n\036SourceStateUpd" + + "atedRequestProto\022/\n\020query_identifier\030\001 \001" + + "(\0132\025.QueryIdentifierProto\022\020\n\010src_name\030\002 " + + "\001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n" + + "\037SourceStateUpdatedResponseProto\"e\n\031Quer" + + "yCompleteRequestProto\022/\n\020query_identifie" + + "r\030\001 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" + + "_delay\030\002 \001(\003:\0010\"\034\n\032QueryCompleteResponse", + "Proto\"t\n\035TerminateFragmentRequestProto\022/" + + "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" + + "erProto\022\"\n\032fragment_identifier_string\030\002 " + + "\001(\t\" \n\036TerminateFragmentResponseProto\"\210\001" + + "\n\032UpdateFragmentRequestProto\022/\n\020query_id" + + "entifier\030\001 \001(\0132\025.QueryIdentifierProto\022\"\n" + + "\032fragment_identifier_string\030\002 \001(\t\022\025\n\ris_" + + "guaranteed\030\003 \001(\010\"D\n\033UpdateFragmentRespon" + + "seProto\022\016\n\006result\030\001 \001(\010\022\025\n\ris_guaranteed" + + "\030\002 \001(\010\"&\n\024GetTokenRequestProto\022\016\n\006app_id", + "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" + + "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n" + + "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014\"\030\n\026Pur" + + "geCacheRequestProto\"6\n\027PurgeCacheRespons" + + "eProto\022\033\n\023purged_memory_bytes\030\001 \001(\003\"&\n\010M" + + "apEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\036\n\034G" + + "etDaemonMetricsRequestProto\";\n\035GetDaemon" + + "MetricsResponseProto\022\032\n\007metrics\030\001 \003(\0132\t." + + "MapEntry\"A\n\027SetCapacityRequestProto\022\023\n\013e" + + "xecutorNum\030\001 \001(\005\022\021\n\tqueueSize\030\002 \001(\005\"\032\n\030S", + "etCapacityResponseProto\"F\n\027EvictEntityRe" + + "questProto\022\017\n\007db_name\030\001 \002(\t\022\032\n\005table\030\002 \003" + + "(\0132\013.TableProto\"D\n\nTableProto\022\022\n\ntable_n" + + "ame\030\001 \002(\t\022\020\n\010part_key\030\002 \003(\t\022\020\n\010part_val\030" + + "\003 \003(\t\"1\n\030EvictEntityResponseProto\022\025\n\revi" + + "cted_bytes\030\001 \002(\003*2\n\020SourceStateProto\022\017\n\013" + + "S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Submissi" + + "onStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020\002" + + "\022\021\n\rEVICTED_OTHER\020\0032\337\003\n\022LlapDaemonProtoc" + + "ol\022B\n\013registerDag\022\030.RegisterDagRequestPr", + "oto\032\031.RegisterDagResponseProto\022?\n\nsubmit" + + "Work\022\027.SubmitWorkRequestProto\032\030.SubmitWo" + + "rkResponseProto\022W\n\022sourceStateUpdated\022\037." + + "SourceStateUpdatedRequestProto\032 .SourceS" + + "tateUpdatedResponseProto\022H\n\rqueryComplet" + + "e\022\032.QueryCompleteRequestProto\032\033.QueryCom" + + "pleteResponseProto\022T\n\021terminateFragment\022" + + "\036.TerminateFragmentRequestProto\032\037.Termin" + + "ateFragmentResponseProto\022K\n\016updateFragme" + + "nt\022\033.UpdateFragmentRequestProto\032\034.Update", + "FragmentResponseProto2\371\002\n\026LlapManagement" + + "Protocol\022C\n\022getDelegationToken\022\025.GetToke" + + "nRequestProto\032\026.GetTokenResponseProto\022?\n" + + "\npurgeCache\022\027.PurgeCacheRequestProto\032\030.P" + + "urgeCacheResponseProto\022Q\n\020getDaemonMetri" + + "cs\022\035.GetDaemonMetricsRequestProto\032\036.GetD" + + "aemonMetricsResponseProto\022B\n\013setCapacity" + + "\022\030.SetCapacityRequestProto\032\031.SetCapacity" + + "ResponseProto\022B\n\013evictEntity\022\030.EvictEnti" + + "tyRequestProto\032\031.EvictEntityResponseProt", + "oBH\n&org.apache.hadoop.hive.llap.daemon." + + "rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -26254,7 +26532,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_SubmitWorkRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkRequestProto_descriptor, - new java.lang.String[] { "WorkSpec", "WorkSpecSignature", "FragmentNumber", "AttemptNumber", "ContainerIdString", "AmHost", "AmPort", "CredentialsBinary", "FragmentRuntimeInfo", "InitialEventBytes", "InitialEventSignature", "IsGuaranteed", }); + new java.lang.String[] { "WorkSpec", "WorkSpecSignature", "FragmentNumber", "AttemptNumber", "ContainerIdString", "AmHost", "AmPort", "CredentialsBinary", "FragmentRuntimeInfo", "InitialEventBytes", "InitialEventSignature", "IsGuaranteed", "Jwt", "IsExternalClientRequest", }); internal_static_RegisterDagRequestProto_descriptor = getDescriptor().getMessageTypes().get(10); internal_static_RegisterDagRequestProto_fieldAccessorTable = new diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index c26ab629e9..006634dd72 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.io.DataInputBuffer; @@ -401,4 +400,24 @@ public static Credentials credentialsFromByteArray(byte[] binaryCredentials) credentials.readTokenStorageStream(dib); return credentials; } + + /** + * @return returns the value of LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED + * @param conf + */ + public static boolean isCloudDeployment(Configuration conf) { + return HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED, false); + } + + /** + * @return returns the value of PUBLIC_HOSTNAME from either environment variable or system properties + */ + public static String getPublicHostname() { + String publicHostname = System.getenv("PUBLIC_HOSTNAME"); + if (publicHostname == null) { + publicHostname = System.getProperty("PUBLIC_HOSTNAME"); + } + return publicHostname; + } + } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/DefaultJwtSharedSecretProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/DefaultJwtSharedSecretProvider.java new file mode 100644 index 0000000000..dca8afc2c6 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/DefaultJwtSharedSecretProvider.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import com.google.common.base.Preconditions; +import io.jsonwebtoken.security.Keys; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.security.Key; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET; + +/** + * Default implementation of {@link JwtSecretProvider}. + * + * 1. It first tries to get shared secret from conf {@link HiveConf.ConfVars#LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET} + * using {@link Configuration#getPassword(String)}. + * + * 2. If not found, it tries to read from env var {@link #LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR}. + * + * If secret is not found even after 1) and 2), {@link #init(Configuration)} methods throws {@link IllegalStateException}. + * + * Length of shared secret provided in 1) or 2) should be > 32 bytes. + * + * It uses the same encryption and decryption secret which can be used to sign and verify JWT. + */ +public class DefaultJwtSharedSecretProvider implements JwtSecretProvider { + + public static final String LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR = + "LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR"; + + private Key jwtEncryptionKey; + + @Override public Key getEncryptionSecret() { + return jwtEncryptionKey; + } + + @Override public Key getDecryptionSecret() { + return jwtEncryptionKey; + } + + @Override public void init(final Configuration conf) { + char[] sharedSecret; + byte[] sharedSecretBytes = null; + + // try getting secret from conf first + // if not found, get from env var - LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR + try { + sharedSecret = conf.getPassword(LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET.varname); + } catch (IOException e) { + throw new RuntimeException("Unable to get password [hive.llap.external.client.cloud.jwt.shared.secret] - " + + e.getMessage(), e); + } + if (sharedSecret != null) { + ByteBuffer bb = StandardCharsets.UTF_8.encode(CharBuffer.wrap(sharedSecret)); + sharedSecretBytes = new byte[bb.remaining()]; + bb.get(sharedSecretBytes); + } else { + String sharedSecredFromEnv = System.getenv(LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR); + if (StringUtils.isNotBlank(sharedSecredFromEnv)) { + sharedSecretBytes = sharedSecredFromEnv.getBytes(); + } + } + + Preconditions.checkState(sharedSecretBytes != null, + "With: " + LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED.varname + " = true, \n" + + "To use: org.apache.hadoop.hive.llap.security.DefaultJwtSharedSecretProvider, \n" + + "1. a non-null value of 'hive.llap.external.client.cloud.jwt.shared.secret' must be provided OR \n" + + "2. alternatively environment variable " + + "LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_ENV_VAR can also be set. \n" + + "Length of the secret provided in 1) or 2) should be > 32 bytes."); + + this.jwtEncryptionKey = Keys.hmacShaKeyFor(sharedSecretBytes); + } + +} diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/JwtSecretProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/JwtSecretProvider.java new file mode 100644 index 0000000000..49e4952655 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/JwtSecretProvider.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.security.Key; + +/** + * JwtSecretProvider + * + * - provides encryption and decryption secrets for generating and parsing JWTs. + * + * - Hive internally uses method initAndGet() which initializes providers based on the value of config + * {@link HiveConf.ConfVars#LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_PROVIDER}. + * It expects implementations to provide default constructor and {@link #init(Configuration)} method. + */ +public interface JwtSecretProvider { + + /** + * returns secret for signing JWT. + */ + Key getEncryptionSecret(); + + /** + * returns secret for parsing JWT. + */ + Key getDecryptionSecret(); + + /** + * Initializes the provider. + * Should also contain any validations that we want to put on secret, helps us to fail fast. + * @param conf configuration + */ + void init(Configuration conf); + + /** + * Hive internally uses this method to obtain instance of {@link JwtSecretProvider} + * + * @param conf configuration + * @return implementation of {@link HiveConf.ConfVars#LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_PROVIDER} + */ + static JwtSecretProvider initAndGet(Configuration conf) { + final String providerClass = + HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_PROVIDER); + JwtSecretProvider provider; + try { + provider = (JwtSecretProvider) Class.forName(providerClass).newInstance(); + provider.init(conf); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Unable to instantiate provider: " + providerClass, e); + } + return provider; + } +} diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapExtClientJwtHelper.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapExtClientJwtHelper.java new file mode 100644 index 0000000000..68e81a4cdd --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapExtClientJwtHelper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.Jwts; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.Date; +import java.util.UUID; + +/** + * Contains helper methods for generating and verifying JWTs for external llap clients. + * Initializes and uses {@link JwtSecretProvider} to obtain encryption and decryption secret. + */ +public class LlapExtClientJwtHelper { + + public static final String LLAP_JWT_SUBJECT = "llap"; + public static final String LLAP_EXT_CLIENT_APP_ID = "llap_ext_client_app_id"; + private final JwtSecretProvider jwtSecretProvider; + + public LlapExtClientJwtHelper(Configuration conf) { + this.jwtSecretProvider = JwtSecretProvider.initAndGet(conf); + } + + /** + * @param extClientAppId application Id - application Id injected by get_splits + * @return JWT signed with {@link JwtSecretProvider#getEncryptionSecret()}. + * As of now this JWT contains extClientAppId in claims. + */ + public String buildJwtForLlap(ApplicationId extClientAppId) { + return Jwts.builder() + .setSubject(LLAP_JWT_SUBJECT) + .setIssuedAt(new Date()) + .setId(UUID.randomUUID().toString()) + .claim(LLAP_EXT_CLIENT_APP_ID, extClientAppId.toString()) + .signWith(jwtSecretProvider.getEncryptionSecret()) + .compact(); + } + + /** + * + * @param jwt signed JWT String + * @return claims present in JWT, this method parses jwt using {@link JwtSecretProvider#getDecryptionSecret()} + */ + public Jws parseClaims(String jwt) { + return Jwts.parser() + .setSigningKey(jwtSecretProvider.getDecryptionSecret()) + .parseClaimsJws(jwt); + } + +} diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 9f0d2f3d58..ca0a99b734 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -132,6 +132,8 @@ message SubmitWorkRequestProto { optional bytes initial_event_signature = 11; optional bool is_guaranteed = 12 [default = false]; + optional string jwt = 13; + optional bool is_external_client_request = 14 [default = false]; } message RegisterDagRequestProto { diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6bf7f33f64..7b04e2c455 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -20,7 +20,6 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.OutputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; @@ -29,13 +28,11 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; @@ -48,14 +45,13 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; +import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; -import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -122,7 +118,6 @@ public static final Pattern SET_QUERY_PATTERN = Pattern.compile("^\\s*set\\s+.*=.+$", Pattern.CASE_INSENSITIVE); public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)"; - public static final LlapServiceInstance[] serviceInstanceArray = new LlapServiceInstance[0]; public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.url = url; @@ -157,12 +152,13 @@ public LlapBaseInputFormat() { HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); - LlapServiceInstance serviceInstance = getServiceInstance(job, llapSplit); - String host = serviceInstance.getHost(); - int llapSubmitPort = serviceInstance.getRpcPort(); + // llapSplit.getLlapDaemonInfos() will never be empty as of now, also validated this in GenericUDTFGetSplits while populating. + final LlapDaemonInfo llapDaemonInfo = llapSplit.getLlapDaemonInfos()[0]; + final String host = llapDaemonInfo.getHost(); + final int outputPort = llapDaemonInfo.getOutputFormatPort(); + final int llapSubmitPort = llapDaemonInfo.getRpcPort(); - LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort - + " and outputformat port " + serviceInstance.getOutputFormatPort()); + LOG.info("Will try to submit request to first Llap Daemon in the split - {}", llapDaemonInfo); byte[] llapTokenBytes = llapSplit.getTokenBytes(); Token llapToken = null; @@ -196,24 +192,28 @@ public LlapBaseInputFormat() { SubmitWorkRequestProto request = constructSubmitWorkRequestProto( submitWorkInfo, taskNum, attemptNum, llapClient.getAddress(), - submitWorkInfo.getToken(), llapSplit.getFragmentBytes(), - llapSplit.getFragmentBytesSignature(), job); - llapClient.submitWork(request, host, llapSubmitPort); - - Socket socket = new Socket(host, serviceInstance.getOutputFormatPort()); + submitWorkInfo.getToken(), llapSplit, job); - LOG.debug("Socket connected"); SignableVertexSpec vertex = SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary()); - String fragmentId = Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()).toString(); + + LOG.info("Submitting fragment:{} to llap [host = {}, port = {}] ", fragmentId, host, llapSubmitPort); + + llapClient.submitWork(request, host, llapSubmitPort); + + Socket socket = new Socket(host, outputPort); + OutputStream socketStream = socket.getOutputStream(); LlapOutputSocketInitMessage.Builder builder = LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId); if (llapSplit.getTokenBytes() != null) { builder.setToken(ByteString.copyFrom(llapSplit.getTokenBytes())); } + + LOG.info("Registering fragment:{} to llap [host = {}, output port = {}] to read output", + fragmentId, host, outputPort); builder.build().writeDelimitedTo(socketStream); socketStream.flush(); @@ -418,91 +418,12 @@ public void run() { }); } - private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { - LlapRegistryService registryService = LlapRegistryService.getClient(job); - LlapServiceInstance serviceInstance = null; - String[] hosts = llapSplit.getLocations(); - if (hosts != null && hosts.length > 0) { - String host = llapSplit.getLocations()[0]; - serviceInstance = getServiceInstanceForHost(registryService, host); - if (serviceInstance == null) { - LOG.info("No service instances found for " + host + " in registry."); - } - } - - if (serviceInstance == null) { - serviceInstance = getServiceInstanceRandom(registryService); - if (serviceInstance == null) { - throw new IOException("No service instances found in registry"); - } - } - - return serviceInstance; - } - - private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { - InetAddress address = InetAddress.getByName(host); - ServiceInstanceSet instanceSet = registryService.getInstances(); - LlapServiceInstance serviceInstance = null; - - // The name used in the service registry may not match the host name we're using. - // Try hostname/canonical hostname/host address - - String name = address.getHostName(); - LOG.info("Searching service instance by hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getCanonicalHostName(); - LOG.info("Searching service instance by canonical hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getHostAddress(); - LOG.info("Searching service instance by address " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - return serviceInstance; - } - - - private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - ServiceInstanceSet instanceSet = registryService.getInstances(); - LlapServiceInstance serviceInstance = null; - - LOG.info("Finding random live service instance"); - Collection allInstances = instanceSet.getAll(); - if (allInstances.size() > 0) { - int randIdx = rand.nextInt(allInstances.size());; - serviceInstance = allInstances.toArray(serviceInstanceArray)[randIdx]; - } - return serviceInstance; - } - - private LlapServiceInstance selectServiceInstance(Set serviceInstances) { - if (serviceInstances == null || serviceInstances.isEmpty()) { - return null; - } - - // Get the first live service instance - for (LlapServiceInstance serviceInstance : serviceInstances) { - return serviceInstance; - } - - LOG.info("No live service instances were found"); - return null; - } - private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, int taskNum, int attemptNum, InetSocketAddress address, Token token, - byte[] fragmentBytes, byte[] fragmentBytesSignature, JobConf job) throws IOException { + LlapInputSplit llapInputSplit, JobConf job) throws IOException { + byte[] fragmentBytes = llapInputSplit.getFragmentBytes(); + byte[] fragmentBytesSignature = llapInputSplit.getFragmentBytesSignature(); + ApplicationId appId = submitWorkInfo.getFakeAppId(); // This works, assuming the executor is running within YARN. @@ -545,6 +466,8 @@ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo su if (fragmentBytesSignature != null) { builder.setInitialEventSignature(ByteString.copyFrom(fragmentBytesSignature)); } + builder.setJwt(llapInputSplit.getJwt()); + builder.setIsExternalClientRequest(true); return builder.build(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java index 90f9b2c7d5..8333d58f11 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java @@ -68,7 +68,10 @@ public Void call() throws Exception { org.apache.arrow.memory.BaseAllocator.class, //arrow-memory org.apache.arrow.flatbuf.Schema.class, //arrow-format com.google.flatbuffers.Table.class, //flatbuffers - com.carrotsearch.hppc.ByteArrayDeque.class //hppc + com.carrotsearch.hppc.ByteArrayDeque.class, //hppc + io.jsonwebtoken.security.Keys.class, //jjwt-api + io.jsonwebtoken.impl.DefaultJws.class, //jjwt-impl + io.jsonwebtoken.io.JacksonSerializer.class, //jjwt-jackson }; for (Class c : dependencies) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index a4de3d9d39..99f3c15b45 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +34,11 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; + +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.JwtException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -74,6 +78,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.security.LlapExtClientJwtHelper; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; @@ -230,11 +235,15 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()); String fragmentIdString = attemptId.toString(); + + QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); + + verifyJwtForExternalClient(request, qIdProto.getApplicationIdString(), fragmentIdString); + if (LOG.isInfoEnabled()) { LOG.info("Queueing container for execution: fragemendId={}, {}", fragmentIdString, stringifySubmitRequest(request, vertex)); } - QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(), localAddress.get().getHostName(), @@ -342,6 +351,39 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws .build(); } + // if request is coming from llap external client, verify the JWT + // as of now, JWT contains applicationId + private void verifyJwtForExternalClient(SubmitWorkRequestProto request, String extClientAppIdFromSplit, + String fragmentIdString) { + LOG.info("Checking if request[{}] is from llap external client in a cloud based deployment", + extClientAppIdFromSplit); + if (request.getIsExternalClientRequest() && LlapUtil.isCloudDeployment(getConfig())) { + LOG.info("Llap external client request - {}, verifying JWT", extClientAppIdFromSplit); + Preconditions.checkState(request.hasJwt(), "JWT not found in request, fragmentId: " + fragmentIdString); + + LlapExtClientJwtHelper llapExtClientJwtHelper = new LlapExtClientJwtHelper(getConfig()); + Jws claimsJws; + try { + claimsJws = llapExtClientJwtHelper.parseClaims(request.getJwt()); + } catch (JwtException e) { + LOG.error("Cannot verify JWT provided with the request, fragmentId: {}, {}", fragmentIdString, e); + throw e; + } + + String extClientAppIdFromJwt = (String) claimsJws.getBody().get(LlapExtClientJwtHelper.LLAP_EXT_CLIENT_APP_ID); + + // this should never happen ideally. + // extClientAppId is injected in JWT and fragment request by initial get_splits() call. + // so both of these - extClientAppIdFromJwt and extClientAppIdFromSplit should be equal eventually if the signed JWT is valid for this request. + // In get_splits, this extClientAppId is obtained via LlapCoordinator#createExtClientAppId which generates a + // application Id to be used by external clients. + Preconditions.checkState(extClientAppIdFromJwt.equals(extClientAppIdFromSplit), + String.format("applicationId[%s] in request does not match to applicationId[%s] in JWT", + extClientAppIdFromSplit, extClientAppIdFromJwt)); + LOG.info("Llap external client request - {}, JWT verification successful", extClientAppIdFromSplit); + } + } + private SignableVertexSpec extractVertexSpec(SubmitWorkRequestProto request, LlapTokenInfo tokenInfo) throws InvalidProtocolBufferException, IOException { VertexOrBinary vob = request.getWorkSpec(); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index abf28f2bed..69c6456b99 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.security.LlapExtClientJwtHelper; import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory; import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; @@ -129,7 +130,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort, int webPort, String appName) { + boolean externalClientCloudSetupEnabled, int externalClientsRpcPort, + int mngPort, int shufflePort, int webPort, String appName) { super("LlapDaemon"); printAsciiArt(); @@ -137,6 +139,12 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor Preconditions.checkArgument(numExecutors > 0); Preconditions.checkArgument(srvPort == 0 || (srvPort > 1024 && srvPort < 65536), "Server RPC Port must be between 1025 and 65535, or 0 automatic selection"); + if (externalClientCloudSetupEnabled) { + Preconditions.checkArgument( + externalClientsRpcPort == 0 || (externalClientsRpcPort > 1024 && externalClientsRpcPort < 65536), + "Server RPC port for external clients must be between 1025 and 65535, or 0 automatic selection"); + } + Preconditions.checkArgument(mngPort == 0 || (mngPort > 1024 && mngPort < 65536), "Management RPC Port must be between 1025 and 65535, or 0 automatic selection"); Preconditions.checkArgument(localDirs != null && localDirs.length > 0, @@ -215,6 +223,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", llapIoEnabled=" + ioEnabled + ", llapIoCacheIsDirect=" + isDirectCache + ", rpcListenerPort=" + srvPort + + ", externalClientCloudSetupEnabled=" + externalClientCloudSetupEnabled + + ", rpcListenerPortForExternalClients=" + externalClientsRpcPort + ", mngListenerPort=" + mngPort + ", webPort=" + webPort + ", outputFormatSvcPort=" + outputFormatServicePort + @@ -308,7 +318,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } this.secretManager = sm; this.server = new LlapProtocolServerImpl(secretManager, - numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId, metrics); + numHandlers, this, srvAddress, mngAddress, srvPort, externalClientsRpcPort, mngPort, daemonId, metrics); UgiFactory fsUgiFactory = null; try { @@ -474,6 +484,15 @@ public void serviceStart() throws Exception { getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort()); } getConfig().setInt(ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT.varname, LlapOutputFormatService.get().getPort()); + if (LlapUtil.isCloudDeployment(getConfig())) { + + // this invokes JWT secret provider and tries to get shared secret. + // meant to validate shared secret as well. + new LlapExtClientJwtHelper(getConfig()); + + getConfig().setInt(ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT.varname, + server.getExternalClientsRpcServerBindAddress().getPort()); + } // Ensure this is set in the config so that the AM can read it. getConfig() @@ -568,6 +587,8 @@ public static void main(String[] args) throws Exception { String[] localDirs = (localDirList == null || localDirList.isEmpty()) ? new String[0] : StringUtils.getTrimmedStrings(localDirList); int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); + int externalClientCloudRpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT); + boolean externalClientCloudSetupEnabled = LlapUtil.isCloudDeployment(daemonConf); int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); int shufflePort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); @@ -583,7 +604,8 @@ public static void main(String[] args) throws Exception { LlapDaemon.initializeLogging(daemonConf); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache, - ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, appName); + ioMemoryBytes, localDirs, rpcPort, externalClientCloudSetupEnabled, + externalClientCloudRpcPort, mngPort, shufflePort, webPort, appName); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index 5509f8a5d7..03976c641a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -75,8 +75,8 @@ private final int numHandlers; private final ContainerRunner containerRunner; - private final int srvPort, mngPort; - private RPC.Server server, mngServer; + private final int srvPort, mngPort, externalClientsRpcPort; + private RPC.Server server, mngServer, externalClientsRpcServer; private final AtomicReference srvAddress, mngAddress; private final SecretManager secretManager; private String clusterUser = null; @@ -87,7 +87,8 @@ public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers, ContainerRunner containerRunner, AtomicReference srvAddress, - AtomicReference mngAddress, int srvPort, int mngPort, DaemonId daemonId, + AtomicReference mngAddress, int srvPort, int externalClientsRpcPort, + int mngPort, DaemonId daemonId, LlapDaemonExecutorMetrics executorMetrics) { super("LlapDaemonProtocolServerImpl"); this.numHandlers = numHandlers; @@ -96,6 +97,7 @@ public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers, this.srvAddress = srvAddress; this.srvPort = srvPort; this.mngAddress = mngAddress; + this.externalClientsRpcPort = externalClientsRpcPort; this.mngPort = mngPort; this.daemonId = daemonId; this.executorMetrics = executorMetrics; @@ -232,6 +234,15 @@ private void startProtocolServers( server = LlapUtil.startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl, LlapProtocolBlockingPB.class, secretManager, pp, ConfVars.LLAP_SECURITY_ACL, ConfVars.LLAP_SECURITY_ACL_DENY); + // for cloud deployments, start a separate RPC server on the port + // which we can open to accept requests from external clients. + if (LlapUtil.isCloudDeployment(conf)) { + externalClientsRpcServer = LlapUtil.startProtocolServer(externalClientsRpcPort, numHandlers, null, conf, daemonImpl, + LlapProtocolBlockingPB.class, secretManager, pp, ConfVars.LLAP_SECURITY_ACL, + ConfVars.LLAP_SECURITY_ACL_DENY); + + LOG.info("Started externalClientsRpcServer for cloud based deployments : {}, {}", externalClientsRpcServer.getListenerAddress(), externalClientsRpcServer); + } mngServer = LlapUtil.startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl, LlapManagementProtocolPB.class, secretManager, pp, ConfVars.LLAP_MANAGEMENT_ACL, ConfVars.LLAP_MANAGEMENT_ACL_DENY); @@ -243,6 +254,9 @@ public void serviceStop() { if (server != null) { server.stop(); } + if (externalClientsRpcServer != null) { + externalClientsRpcServer.stop(); + } if (mngServer != null) { mngServer.stop(); } @@ -258,6 +272,11 @@ InetSocketAddress getManagementBindAddress() { return mngAddress.get(); } + @InterfaceAudience.Private + InetSocketAddress getExternalClientsRpcServerBindAddress() { + return externalClientsRpcServer.getListenerAddress(); + } + @Override public GetTokenResponseProto getDelegationToken(RpcController controller, GetTokenRequestProto request) throws ServiceException { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index be51bf8c91..fb92806d82 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import org.apache.hadoop.hive.llap.LlapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -136,6 +137,7 @@ private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZ @Override public void serviceInit(Configuration conf) throws IOException, InterruptedException { int rpcPort = 0; + int externalClientCloudRpcPort = 0; int mngPort = 0; int shufflePort = 0; int webPort = 0; @@ -144,6 +146,7 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf); if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); + externalClientCloudRpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); @@ -165,11 +168,14 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); clusterSpecificConfiguration.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost"); clusterSpecificConfiguration.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort()); - + + boolean externalClientCloudSetupEnabled = LlapUtil.isCloudDeployment(conf); + LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); for (int i = 0 ;i < numInstances ; i++) { llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, externalClientCloudSetupEnabled, externalClientCloudRpcPort, + mngPort, shufflePort, webPort, clusterNameTrimmed); llapDaemons[i].init(new Configuration(conf)); } LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java index d3817e960e..4827677aa5 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java @@ -74,7 +74,7 @@ public void setUp() { String[] localDirs = new String[1]; LlapDaemonInfo.initialize("testDaemon", hiveConf); daemon = new LlapDaemon(hiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false, - -1, localDirs, 0, 0, 0, -1, "TestLlapDaemon"); + -1, localDirs, 0, false, 0,0, 0, -1, "TestLlapDaemon"); } @After diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index b5fdf9d704..3233fc67e3 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -55,7 +55,7 @@ public void testSimpleCall() throws ServiceException, IOException { LlapProtocolServerImpl server = new LlapProtocolServerImpl(null, numHandlers, containerRunnerMock, new AtomicReference(), new AtomicReference(), - 0, 0, null, null); + 0, 0, 0, null, null); when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn( SubmitWorkResponseProto .newBuilder() @@ -90,7 +90,7 @@ public void testGetDaemonMetrics() throws ServiceException, IOException { LlapProtocolServerImpl server = new LlapProtocolServerImpl(null, numHandlers, null, new AtomicReference(), new AtomicReference(), - 0, 0, null, executorMetrics); + 0, 0, 0, null, executorMetrics); executorMetrics.addMetricsFallOffFailedTimeLost(10); executorMetrics.addMetricsFallOffKilledTimeLost(11); executorMetrics.addMetricsFallOffSuccessTimeLost(12); @@ -161,7 +161,7 @@ public void testSetCapacity() throws ServiceException, IOException { LlapProtocolServerImpl server = new LlapProtocolServerImpl(null, numHandlers, containerRunnerMock, new AtomicReference(), new AtomicReference(), - 0, 0, null, executorMetrics); + 0, 0, 0, null, executorMetrics); try { server.init(new Configuration()); diff --git a/pom.xml b/pom.xml index ddb59b7c55..887f3547d8 100644 --- a/pom.xml +++ b/pom.xml @@ -210,6 +210,7 @@ 3.0.0 0.6.0 2.2.4 + 0.10.5 1.2 2.0.1 2.4.0 @@ -352,6 +353,21 @@ commons-dbcp2 ${commons-dbcp2.version}
+ + io.jsonwebtoken + jjwt-api + ${jjwt.version} + + + io.jsonwebtoken + jjwt-impl + ${jjwt.version} + + + io.jsonwebtoken + jjwt-jackson + ${jjwt.version} + io.netty netty-all diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index ebd041ba9c..5e1be4ee4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -28,6 +28,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.UUID; @@ -35,6 +36,7 @@ import javax.security.auth.login.LoginException; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.SubmitWorkInfo; @@ -53,6 +56,10 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.security.LlapExtClientJwtHelper; import org.apache.hadoop.hive.llap.security.LlapSigner; import org.apache.hadoop.hive.llap.security.LlapSigner.Signable; import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage; @@ -226,29 +233,29 @@ protected void initArgs(Object[] arguments) { protected SplitResult getSplitResult(boolean generateLightWeightSplits) throws HiveException, IOException { - // Generate applicationId for the LLAP splits + // Generate extClientAppId for the LLAP splits LlapCoordinator coordinator = LlapCoordinator.getInstance(); if (coordinator == null) { throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); } - ApplicationId applicationId = coordinator.createExtClientAppId(); + ApplicationId extClientAppId = coordinator.createExtClientAppId(); String externalDagName = SessionState.get().getConf().getVar(ConfVars.HIVEQUERYNAME); StringBuilder sb = new StringBuilder(); - sb.append("Generated appID ").append(applicationId.toString()).append(" for LLAP splits"); + sb.append("Generated appID ").append(extClientAppId.toString()).append(" for LLAP splits"); if (externalDagName != null) { sb.append(", with externalID ").append(externalDagName); } LOG.info(sb.toString()); - PlanFragment fragment = createPlanFragment(inputArgQuery, applicationId); + PlanFragment fragment = createPlanFragment(inputArgQuery, extClientAppId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; boolean generateSingleSplit = forceSingleSplit && orderByQuery; - SplitResult splitResult = getSplits(jc, tezWork, schema, applicationId, generateSingleSplit, + SplitResult splitResult = getSplits(jc, tezWork, schema, extClientAppId, generateSingleSplit, generateLightWeightSplits); validateSplitResult(splitResult, generateLightWeightSplits, generateSingleSplit); return splitResult; @@ -427,14 +434,14 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) // generateLightWeightSplits - if true then // 1) schema and planBytes[] in each LlapInputSplit are not populated // 2) schemaSplit(contains only schema) and planSplit(contains only planBytes[]) are populated in SplitResult - private SplitResult getSplits(JobConf job, TezWork work, Schema schema, ApplicationId applicationId, + private SplitResult getSplits(JobConf job, TezWork work, Schema schema, ApplicationId extClientAppId, final boolean generateSingleSplit, boolean generateLightWeightSplits) throws IOException { SplitResult splitResult = new SplitResult(); splitResult.schemaSplit = new LlapInputSplit( 0, new byte[0], new byte[0], new byte[0], - new SplitLocationInfo[0], schema, "", new byte[0]); + new SplitLocationInfo[0], new LlapDaemonInfo[0], schema, "", new byte[0], ""); if (schemaSplitOnly) { // schema only return splitResult; @@ -460,9 +467,9 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); } - // Update the queryId to use the generated applicationId. See comment below about + // Update the queryId to use the generated extClientAppId. See comment below about // why this is done. - HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); + HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, extClientAppId.toString()); Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, work, DagUtils.createTezLrMap(appJarLr, null)); String vertexName = wx.getName(); @@ -511,7 +518,7 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat LlapTokenLocalClient tokenClient = coordinator.getLocalTokenClient(job, llapUser); // We put the query user, not LLAP user, into the message and token. Token token = tokenClient.createToken( - applicationId.toString(), queryUser, true); + extClientAppId.toString(), queryUser, true); LOG.info("Created the token for remote user: {}", token); bos.reset(); token.write(dos); @@ -521,7 +528,7 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat } // Generate umbilical token (applies to all splits) - Token umbilicalToken = JobTokenCreator.createJobToken(applicationId); + Token umbilicalToken = JobTokenCreator.createJobToken(extClientAppId); LOG.info("Number of splits: " + numGroupedSplitsGenerated); SignedMessage signedSvs = null; @@ -530,7 +537,7 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat final Schema emptySchema = new Schema(); for (int i = 0; i < numGroupedSplitsGenerated; i++) { TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, - numGroupedSplitsGenerated, applicationId, i); + numGroupedSplitsGenerated, extClientAppId, i); // 2. Generate the vertex/submit information for all events. if (i == 0) { @@ -540,16 +547,16 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat // is moved out of a UDTF into a proper API. // Setting this to the generated AppId which is unique. // Despite the differences in TaskSpec, the vertex spec should be the same. - signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, queryUser, - applicationId.toString()); - SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, + signedSvs = createSignedVertexSpec(signer, taskSpec, extClientAppId, queryUser, + extClientAppId.toString()); + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(extClientAppId, System.currentTimeMillis(), numGroupedSplitsGenerated, signedSvs.message, signedSvs.signature, umbilicalToken); submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); if (generateLightWeightSplits) { splitResult.planSplit = new LlapInputSplit( 0, submitWorkBytes, new byte[0], new byte[0], - new SplitLocationInfo[0], new Schema(), "", new byte[0]); + new SplitLocationInfo[0], new LlapDaemonInfo[0], new Schema(), "", new byte[0], ""); } } @@ -559,12 +566,25 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat // 4. Make location hints. SplitLocationInfo[] locations = makeLocationHints(hints.get(i)); + // 5. populate info about llap daemons(to help client submit request and read data) + LlapDaemonInfo[] llapDaemonInfos = populateLlapDaemonInfos(job, locations); + + // 6. Generate JWT for external clients if it's a cloud deployment + // we inject extClientAppId in JWT which is same as what fragment contains. + // extClientAppId in JWT and in fragment are compared on LLAP when a fragment is submitted. + // see method ContainerRunnerImpl#verifyJwtForExternalClient + String jwt = ""; + if (LlapUtil.isCloudDeployment(job)) { + LlapExtClientJwtHelper llapExtClientJwtHelper = new LlapExtClientJwtHelper(job); + jwt = llapExtClientJwtHelper.buildJwtForLlap(extClientAppId); + } + if (generateLightWeightSplits) { result[i] = new LlapInputSplit(i, emptySubmitWorkBytes, eventBytes.message, - eventBytes.signature, locations, emptySchema, llapUser, tokenBytes); + eventBytes.signature, locations, llapDaemonInfos, emptySchema, llapUser, tokenBytes, jwt); } else { result[i] = new LlapInputSplit(i, submitWorkBytes, eventBytes.message, - eventBytes.signature, locations, schema, llapUser, tokenBytes); + eventBytes.signature, locations, llapDaemonInfos, schema, llapUser, tokenBytes, jwt); } } splitResult.actualSplits = result; @@ -640,6 +660,39 @@ public String toString() { return locations; } + private LlapDaemonInfo[] populateLlapDaemonInfos(JobConf job, SplitLocationInfo[] locations) throws IOException { + LlapRegistryService registryService = LlapRegistryService.getClient(job); + LlapServiceInstanceSet instanceSet = registryService.getInstances(); + Collection llapServiceInstances = null; + + //this means a valid location, see makeLocationHints() + if (locations.length == 1 && locations[0].getLocation() != null) { + llapServiceInstances = instanceSet.getByHost(locations[0].getLocation()); + } + + //okay, so we were unable to find any llap instance by hostname + //let's populate them all so that we can fetch data from any of them. + if (CollectionUtils.isEmpty(llapServiceInstances)) { + llapServiceInstances = instanceSet.getAll(); + } + + Preconditions.checkState(llapServiceInstances.size() > 0, + "Unable to find any of the llap instances in zk registry"); + + LlapDaemonInfo[] llapDaemonInfos = new LlapDaemonInfo[llapServiceInstances.size()]; + int count = 0; + for (LlapServiceInstance inst : llapServiceInstances) { + LlapDaemonInfo info; + if (LlapUtil.isCloudDeployment(job)) { + info = new LlapDaemonInfo(inst.getExternalHostname(), inst.getExternalClientsRpcPort(), inst.getOutputFormatPort()); + } else { + info = new LlapDaemonInfo(inst.getHost(), inst.getRpcPort(), inst.getOutputFormatPort()); + } + llapDaemonInfos[count++] = info; + } + return llapDaemonInfos; + } + private SignedMessage makeEventBytes(Vertex wx, String vertexName, Event event, LlapSigner signer) throws IOException { assert event instanceof InputDataInformationEvent; diff --git a/ql/src/test/results/clientpositive/llap/get_splits_0.q.out b/ql/src/test/results/clientpositive/llap/get_splits_0.q.out index e1ebe95297..48533e1f48 100644 --- a/ql/src/test/results/clientpositive/llap/get_splits_0.q.out +++ b/ql/src/test/results/clientpositive/llap/get_splits_0.q.out @@ -6,7 +6,7 @@ POSTHOOK: query: select get_splits("SELECT * FROM src WHERE value in (SELECT val POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table #### A masked pattern was here #### -src.keystring src.valuestring +src.keystring src.valuestring PREHOOK: query: select get_splits("SELECT key AS `key 1`, value AS `value 1` FROM src",0) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -15,4 +15,4 @@ POSTHOOK: query: select get_splits("SELECT key AS `key 1`, value AS `value 1` FR POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table #### A masked pattern was here #### -key 1stringvalue 1string +key 1stringvalue 1string -- 2.28.0