diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 88b4387..ef32b2b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2591,6 +2591,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "ZooKeeper for ZooKeeper SecretManager."), LLAP_ZKSM_ZK_CONNECTION_STRING("hive.llap.zk.sm.connectionString", "", "ZooKeeper connection string for ZooKeeper SecretManager."), + LLAP_ZK_REGISTRY_USER("hive.llap.zk.registry.user", "", + "In the LLAP ZooKeeper-based registry, specifies the username in the Zookeeper path.\n" + + "This should be the hive user or whichever user is running the LLAP daemon."), // Note: do not rename to ..service.acl; Hadoop generates .hosts setting name from this, // resulting in a collision with existing hive.llap.daemon.service.hosts and bizarre errors. LLAP_SECURITY_ACL("hive.llap.daemon.acl", "*", "The ACL for LLAP daemon."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java index b8b69a7..338930e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java @@ -46,7 +46,8 @@ public void testWritable() throws Exception { planBytes, fragmentBytes, locations, - schema); + schema, + "hive"); ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(byteOutStream); split1.write(dataOut); @@ -94,6 +95,7 @@ static void checkLlapSplits( } assertArrayEquals(split1.getLocations(), split2.getLocations()); assertEquals(split1.getSchema(), split2.getSchema()); + assertEquals(split1.getLlapUser(), split2.getLlapUser()); } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index c440e1e..0536589 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -176,7 +176,7 @@ public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { // worker does not respond due to communication interruptions it will retain the same sequence // number when it returns back. If session timeout expires, the node will be deleted and new // addition of the same node (restart) will get next sequence number - this.pathPrefix = "/" + RegistryUtils.currentUser() + "/" + instanceName + "/workers/worker-"; + this.pathPrefix = "/" + getZkPathUser(this.conf) + "/" + instanceName + "/workers/worker-"; this.instancesCache = null; this.instances = null; this.stateChangeListeners = new HashSet<>(); @@ -210,6 +210,13 @@ private String getQuorumServers(Configuration conf) { return quorum.toString(); } + private String getZkPathUser(Configuration conf) { + // External LLAP clients would need to set LLAP_ZK_REGISTRY_USER to the LLAP daemon user (hive), + // rather than relying on RegistryUtils.currentUser(). + String user = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + return user; + } + public Endpoint getRpcEndpoint() { final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort)); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index 7f11e1d..aaca7d6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -88,6 +88,9 @@ public LlapInputFormat() { Reporter reporter) throws IOException { LlapInputSplit llapSplit = (LlapInputSplit) split; + + // Set conf to use LLAP user rather than current user for LLAP Zk registry. + HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); diff --git a/ql/pom.xml b/ql/pom.xml index ebb9599..e0de1fb 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -286,6 +286,12 @@ ${hadoop.version} true + + org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + true + org.apache.ivy diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 2ac0ccd..17a0d2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -33,16 +33,18 @@ byte[] fragmentBytes; SplitLocationInfo[] locations; Schema schema; + String llapUser; public LlapInputSplit() { } - public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) { + public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) { this.planBytes = planBytes; this.fragmentBytes = fragmentBytes; this.locations = locations; this.schema = schema; this.splitNum = splitNum; + this.llapUser = llapUser; } public Schema getSchema() { @@ -102,7 +104,7 @@ public void write(DataOutput out) throws IOException { throw new IOException(e); } - + out.writeUTF(llapUser); } @Override @@ -134,10 +136,15 @@ public void readFields(DataInput in) throws IOException { } catch (Exception e) { throw new IOException(e); } + llapUser = in.readUTF(); } @Override public SplitLocationInfo[] getLocationInfo() throws IOException { return locations; } + + public String getLlapUser() { + return llapUser; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index ebb0ca5..6267324 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -75,6 +75,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -319,6 +320,7 @@ public PlanFragment createPlanFragment(String query, int num) throws HiveExcepti ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); + String llapUser = RegistryUtils.currentUser(); LOG.info("Number of splits: " + (eventList.size() - 1)); for (int i = 0; i < eventList.size() - 1; i++) { @@ -355,7 +357,7 @@ public PlanFragment createPlanFragment(String query, int num) throws HiveExcepti byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); - result[i] = new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema); + result[i] = new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema, llapUser); } return result; } catch (Exception e) {