diff --git llap-client/pom.xml llap-client/pom.xml
index f6a5629..50c06a4 100644
--- llap-client/pom.xml
+++ llap-client/pom.xml
@@ -32,7 +32,7 @@
-
+
org.apache.hive
@@ -41,41 +41,26 @@
org.apache.hive
- hive-serde
+ hive-llap-common
${project.version}
- commons-codec
- commons-codec
- ${commons-codec.version}
-
-
- commons-lang
- commons-lang
- ${commons-lang.version}
-
-
- org.apache.thrift
- libthrift
- ${libthrift.version}
-
-
org.apache.hadoop
hadoop-common
${hadoop.version}
true
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
org.apache.hadoop
hadoop-mapreduce-client-core
@@ -96,15 +81,15 @@
test
- org.mockito
- mockito-all
- ${mockito-all.version}
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
test
- com.sun.jersey
- jersey-servlet
- ${jersey.version}
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
test
@@ -113,32 +98,18 @@
${hadoop.version}
tests
test
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
-
- org.apache.hadoop
- hadoop-hdfs
- ${hadoop.version}
- test
-
-
- org.apache.hadoop
- hadoop-hdfs
- ${hadoop.version}
- tests
- test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
-
${basedir}/src/java
${basedir}/src/test
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
similarity index 94%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
rename to llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index 2884e40..5b0674a 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
+++ llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.tezplugins;
+package org.apache.hadoop.hive.llap.tez;
import javax.net.SocketFactory;
@@ -47,8 +47,8 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -68,11 +68,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LlapDaemonProtocolClientProxy extends AbstractService {
+public class LlapProtocolClientProxy extends AbstractService {
- private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolClientProxy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolClientProxy.class);
- private final ConcurrentMap hostProxies;
+ private final ConcurrentMap hostProxies;
private final RequestManager requestManager;
private final RetryPolicy retryPolicy;
@@ -82,9 +82,9 @@
private volatile ListenableFuture requestManagerFuture;
private final Token llapToken;
- public LlapDaemonProtocolClientProxy(
+ public LlapProtocolClientProxy(
int numThreads, Configuration conf, Token llapToken) {
- super(LlapDaemonProtocolClientProxy.class.getSimpleName());
+ super(LlapProtocolClientProxy.class.getSimpleName());
this.hostProxies = new ConcurrentHashMap<>();
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
this.llapToken = llapToken;
@@ -466,13 +466,13 @@ public TerminateFragmentResponseProto call() throws Exception {
void indicateError(Throwable t);
}
- private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
+ private LlapProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
- LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
+ LlapProtocolBlockingPB proxy = hostProxies.get(hostId);
if (proxy == null) {
if (llapToken == null) {
- proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ proxy = new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(),
nodeId.getPort(), retryPolicy, socketFactory);
} else {
UserGroupInformation ugi;
@@ -485,16 +485,16 @@ private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
nodeId.getHostname(), nodeId.getPort()));
ugi.addToken(nodeToken);
- proxy = ugi.doAs(new PrivilegedAction() {
+ proxy = ugi.doAs(new PrivilegedAction() {
@Override
- public LlapDaemonProtocolBlockingPB run() {
- return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ public LlapProtocolBlockingPB run() {
+ return new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(),
nodeId.getPort(), retryPolicy, socketFactory);
}
});
}
- LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+ LlapProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
if (proxyOld != null) {
// TODO Shutdown the new proxy.
proxy = proxyOld;
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
similarity index 78%
rename from llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
rename to llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
index a6af8c2..850b67a 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
+++ llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.tezplugins;
+package org.apache.hadoop.hive.llap.tez;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -26,6 +26,7 @@
import com.google.protobuf.Message;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.junit.Assert;
import org.junit.Test;
public class TestLlapDaemonProtocolClientProxy {
@@ -38,8 +39,8 @@ public void testMultipleNodes() {
LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
Message mockMessage = mock(Message.class);
- LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
- LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+ LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
+ LlapProtocolClientProxy.ExecuteRequestCallback.class);
// Request two messages
requestManager.queueRequest(
@@ -52,8 +53,8 @@ public void testMultipleNodes() {
assertEquals(2, requestManager.numSubmissionsCounters);
assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
+ Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
assertEquals(0, requestManager.currentLoopSkippedRequests.size());
assertEquals(0, requestManager.currentLoopSkippedRequests.size());
assertEquals(0, requestManager.currentLoopDisabledNodes.size());
@@ -66,8 +67,8 @@ public void testSingleInvocationPerNode() {
LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
Message mockMessage = mock(Message.class);
- LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
- LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+ LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
+ LlapProtocolClientProxy.ExecuteRequestCallback.class);
// First request for host.
requestManager.queueRequest(
@@ -75,7 +76,7 @@ public void testSingleInvocationPerNode() {
requestManager.process();
assertEquals(1, requestManager.numSubmissionsCounters);
assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
assertEquals(0, requestManager.currentLoopSkippedRequests.size());
// Second request for host. Single invocation since the last has not completed.
@@ -84,7 +85,7 @@ public void testSingleInvocationPerNode() {
requestManager.process();
assertEquals(1, requestManager.numSubmissionsCounters);
assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
assertEquals(1, requestManager.currentLoopSkippedRequests.size());
assertEquals(1, requestManager.currentLoopDisabledNodes.size());
assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
@@ -94,14 +95,14 @@ public void testSingleInvocationPerNode() {
requestManager.process();
assertEquals(2, requestManager.numSubmissionsCounters);
assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ Assert.assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
assertEquals(0, requestManager.currentLoopSkippedRequests.size());
assertEquals(0, requestManager.currentLoopDisabledNodes.size());
assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
}
- static class RequestManagerForTest extends LlapDaemonProtocolClientProxy.RequestManager {
+ static class RequestManagerForTest extends LlapProtocolClientProxy.RequestManager {
int numSubmissionsCounters = 0;
private Map numInvocationsPerNode = new HashMap<>();
@@ -110,7 +111,7 @@ public RequestManagerForTest(int numThreads) {
super(numThreads);
}
- protected void submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) {
+ protected void submitToExecutor(LlapProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) {
numSubmissionsCounters++;
MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
if (nodeCount == null) {
@@ -127,10 +128,10 @@ void reset() {
}
- static class CallableRequestForTest extends LlapDaemonProtocolClientProxy.CallableRequest {
+ static class CallableRequestForTest extends LlapProtocolClientProxy.CallableRequest {
protected CallableRequestForTest(LlapNodeId nodeId, Message message,
- LlapDaemonProtocolClientProxy.ExecuteRequestCallback callback) {
+ LlapProtocolClientProxy.ExecuteRequestCallback callback) {
super(nodeId, message, callback);
}
diff --git llap-common/pom.xml llap-common/pom.xml
new file mode 100644
index 0000000..5343479
--- /dev/null
+++ llap-common/pom.xml
@@ -0,0 +1,235 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 2.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-llap-common
+ jar
+ Hive Llap Common
+
+
+ ..
+
+
+
+
+
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-common
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ tests
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
+
+
+
+
+ protobuf
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ generate-protobuf-sources
+ generate-sources
+
+
+
+
+ Building LLAP Protobuf
+
+
+
+
+
+
+
+
+
+ run
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
+
+
+
+ ${basedir}/src/java
+ ${basedir}/src/test
+
+
+ src/main/resources
+
+ *.py
+ *.pyc
+
+ false
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/gen/protobuf/gen-java
+ src/gen/thrift/gen-javabean
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
similarity index 100%
rename from llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
rename to llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
similarity index 81%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index e293a95..cd11bdb 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
+++ llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.daemon.impl;
+package org.apache.hadoop.hive.llap.impl;
import javax.annotation.Nullable;
import javax.net.SocketFactory;
@@ -27,18 +27,18 @@
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
import org.apache.hadoop.security.UserGroupInformation;
-public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB {
+public class LlapManagementProtocolClientImpl implements LlapManagementProtocolPB {
private final Configuration conf;
private final InetSocketAddress serverAddr;
private final RetryPolicy retryPolicy;
private final SocketFactory socketFactory;
- LlapManagementProtocolBlockingPB proxy;
+ LlapManagementProtocolPB proxy;
public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port,
@@ -54,17 +54,17 @@ public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int
}
}
- public LlapManagementProtocolBlockingPB getProxy() throws IOException {
+ public LlapManagementProtocolPB getProxy() throws IOException {
if (proxy == null) {
proxy = createProxy();
}
return proxy;
}
- public LlapManagementProtocolBlockingPB createProxy() throws IOException {
- RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class);
- ProtocolProxy proxy =
- RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr,
+ public LlapManagementProtocolPB createProxy() throws IOException {
+ RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, ProtobufRpcEngine.class);
+ ProtocolProxy proxy =
+ RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr,
UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
retryPolicy);
return proxy.getProxy();
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java
similarity index 83%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java
index 9c7d2e2..9234b7d 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.daemon.impl;
+package org.apache.hadoop.hive.llap.impl;
import javax.annotation.Nullable;
import javax.net.SocketFactory;
@@ -35,24 +35,24 @@
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
import org.apache.hadoop.security.UserGroupInformation;
// TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment.
-public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingPB {
+public class LlapProtocolClientImpl implements LlapProtocolBlockingPB {
private final Configuration conf;
private final InetSocketAddress serverAddr;
private final RetryPolicy retryPolicy;
private final SocketFactory socketFactory;
- LlapDaemonProtocolBlockingPB proxy;
+ LlapProtocolBlockingPB proxy;
- public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port,
- @Nullable RetryPolicy retryPolicy,
- @Nullable SocketFactory socketFactory) {
+ public LlapProtocolClientImpl(Configuration conf, String hostname, int port,
+ @Nullable RetryPolicy retryPolicy,
+ @Nullable SocketFactory socketFactory) {
this.conf = conf;
this.serverAddr = NetUtils.createSocketAddr(hostname, port);
this.retryPolicy = retryPolicy;
@@ -107,17 +107,17 @@ public TerminateFragmentResponseProto terminateFragment(
}
}
- public LlapDaemonProtocolBlockingPB getProxy() throws IOException {
+ public LlapProtocolBlockingPB getProxy() throws IOException {
if (proxy == null) {
proxy = createProxy();
}
return proxy;
}
- public LlapDaemonProtocolBlockingPB createProxy() throws IOException {
- RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class);
- ProtocolProxy proxy =
- RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr,
+ public LlapProtocolBlockingPB createProxy() throws IOException {
+ RPC.setProtocolEngine(conf, LlapProtocolBlockingPB.class, ProtobufRpcEngine.class);
+ ProtocolProxy proxy =
+ RPC.getProtocolProxy(LlapProtocolBlockingPB.class, 0, serverAddr,
UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
retryPolicy);
return proxy.getProxy();
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
similarity index 70%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
index 4efadac..ff215d4 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
+++ llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
@@ -11,14 +11,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.daemon;
+package org.apache.hadoop.hive.llap.protocol;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
-@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1)
+@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB", protocolVersion = 1)
@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
-public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface {
+@InterfaceAudience.Private
+public interface LlapManagementProtocolPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface {
}
\ No newline at end of file
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java
similarity index 73%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java
index 4c09941..2bd56ca 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
+++ llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java
@@ -12,8 +12,9 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.daemon;
+package org.apache.hadoop.hive.llap.protocol;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo;
@@ -21,8 +22,9 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.security.LlapTokenSelector;
-@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1)
+@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB", protocolVersion = 1)
@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
@TokenInfo(LlapTokenSelector.class)
-public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface {
+@InterfaceAudience.Private
+public interface LlapProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface {
}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
similarity index 100%
rename from llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
similarity index 100%
rename from llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
similarity index 99%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index f61d62f..a5c3631 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hive.llap.tezplugins;
+package org.apache.hadoop.hive.llap.tez;
import java.io.IOException;
import java.util.ArrayList;
diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-common/src/protobuf/LlapDaemonProtocol.proto
similarity index 100%
rename from llap-server/src/protobuf/LlapDaemonProtocol.proto
rename to llap-common/src/protobuf/LlapDaemonProtocol.proto
diff --git llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java
new file mode 100644
index 0000000..fec1c35
--- /dev/null
+++ llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.testhelpers;
+
+import org.apache.hadoop.yarn.util.Clock;
+
+public class ControlledClock implements Clock {
+ private long time = -1;
+ private final Clock actualClock;
+ public ControlledClock(Clock actualClock) {
+ this.actualClock = actualClock;
+ }
+ public synchronized void setTime(long time) {
+ this.time = time;
+ }
+ public synchronized void reset() {
+ time = -1;
+ }
+
+ @Override
+ public synchronized long getTime() {
+ if (time != -1) {
+ return time;
+ }
+ return actualClock.getTime();
+ }
+
+}
diff --git llap-server/pom.xml llap-server/pom.xml
index 916fb5c..c81bdb2 100644
--- llap-server/pom.xml
+++ llap-server/pom.xml
@@ -32,7 +32,7 @@
-
+
org.apache.hive
@@ -46,11 +46,21 @@
org.apache.hive
+ hive-llap-common
+ ${project.version}
+
+
+ org.apache.hive
hive-llap-client
${project.version}
org.apache.hive
+ hive-llap-tez
+ ${project.version}
+
+
+ org.apache.hive
hive-shims
${project.version}
@@ -90,77 +100,66 @@
hadoop-common
${hadoop.version}
true
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
org.apache.hadoop
hadoop-mapreduce-client-core
${hadoop.version}
true
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
-
- org.apache.tez
- tez-runtime-internals
- ${tez.version}
- true
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
-
- org.apache.tez
- tez-runtime-library
- ${tez.version}
- true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
org.apache.tez
- tez-mapreduce
+ tez-runtime-internals
${tez.version}
true
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
org.apache.tez
- tez-dag
+ tez-runtime-library
${tez.version}
true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
org.apache.hadoop
@@ -168,17 +167,17 @@
${hadoop.version}
tests
test
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
-
-
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
org.apache.hadoop
hadoop-hdfs
@@ -225,15 +224,15 @@
org.apache.hbase
hbase-server
${hbase.version}
-
-
- org.slf4j
- slf4j-log4j12
-
-
- commmons-logging
- commons-logging
-
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
@@ -248,54 +247,6 @@
-
-
- protobuf
-
-
-
- org.apache.maven.plugins
- maven-antrun-plugin
-
-
- generate-protobuf-sources
- generate-sources
-
-
-
-
- Building LLAP-Server Protobuf
-
-
-
-
-
-
-
-
-
- run
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- test-jar
-
-
-
-
-
-
-
-
-
-
${basedir}/src/java
${basedir}/src/test
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 544af09..0399798 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -25,6 +25,8 @@
import java.util.Map.Entry;
import java.util.Collection;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -217,7 +219,13 @@ private void run(String[] args) throws Exception {
CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), libDir.toString(), true);
lfs.delete(new Path(libDir, "tez.tar.gz"), false);
+ // llap-common
+ lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapDaemonProtocolProtos.class)), libDir);
+ // llap-tez
+ lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapTezUtils.class)), libDir);
+ // llap-server
lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir);
+ // hive-exec
lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir);
// copy default aux classes (json/hbase)
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 7d7fa00..94b3b41 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -19,7 +19,6 @@
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.net.InetSocketAddress;
-import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -48,7 +47,6 @@
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
@@ -67,7 +65,7 @@
private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
private final Configuration shuffleHandlerConf;
- private final LlapDaemonProtocolServerImpl server;
+ private final LlapProtocolServerImpl server;
private final ContainerRunnerImpl containerRunner;
private final AMReporter amReporter;
private final LlapRegistryService registry;
@@ -166,7 +164,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
- this.server = new LlapDaemonProtocolServerImpl(
+ this.server = new LlapProtocolServerImpl(
numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
ClassLoader executorClassLoader = null;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
similarity index 91%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
rename to llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 45ca906..c386d77 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -55,14 +55,14 @@
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
-public class LlapDaemonProtocolServerImpl extends AbstractService
- implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB {
+public class LlapProtocolServerImpl extends AbstractService
+ implements LlapProtocolBlockingPB, LlapManagementProtocolPB {
- private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
private final int numHandlers;
private final ContainerRunner containerRunner;
@@ -71,12 +71,12 @@
private final AtomicReference srvAddress, mngAddress;
private SecretManager zkSecretManager;
- public LlapDaemonProtocolServerImpl(int numHandlers,
- ContainerRunner containerRunner,
- AtomicReference srvAddress,
- AtomicReference mngAddress,
- int srvPort,
- int mngPort) {
+ public LlapProtocolServerImpl(int numHandlers,
+ ContainerRunner containerRunner,
+ AtomicReference srvAddress,
+ AtomicReference mngAddress,
+ int srvPort,
+ int mngPort) {
super("LlapDaemonProtocolServerImpl");
this.numHandlers = numHandlers;
this.containerRunner = containerRunner;
@@ -84,7 +84,7 @@ public LlapDaemonProtocolServerImpl(int numHandlers,
this.srvPort = srvPort;
this.mngAddress = mngAddress;
this.mngPort = mngPort;
- LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() +
+ LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
" with port configured to: " + srvPort);
}
@@ -152,9 +152,9 @@ public Void run() {
private void startProtocolServers(
Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
- LlapDaemonProtocolBlockingPB.class);
+ LlapProtocolBlockingPB.class);
mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
- LlapManagementProtocolBlockingPB.class);
+ LlapManagementProtocolPB.class);
}
private RPC.Server startProtocolServer(int srvPort, int numHandlers,
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index aa065a9..480a394 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ public boolean canFinish() {
boolean canFinish = true;
if (inputSpecList != null && !inputSpecList.isEmpty()) {
for (IOSpecProto inputSpec : inputSpecList) {
- if (isSourceOfInterest(inputSpec)) {
+ if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) {
// Lookup the state in the map.
LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap()
.get(inputSpec.getConnectedVertexName());
@@ -129,7 +129,7 @@ public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler ha
List inputSpecList = fragmentSpec.getInputSpecsList();
if (inputSpecList != null && !inputSpecList.isEmpty()) {
for (IOSpecProto inputSpec : inputSpecList) {
- if (isSourceOfInterest(inputSpec)) {
+ if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) {
sourcesOfInterest.add(inputSpec.getConnectedVertexName());
}
}
@@ -143,13 +143,6 @@ public void unregisterForFinishableStateUpdates(FinishableStateUpdateHandler han
queryInfo.unregisterFinishableStateUpdate(handler);
}
- private boolean isSourceOfInterest(IOSpecProto inputSpec) {
- String inputClassName = inputSpec.getIoDescriptor().getClassName();
- // MRInput is not of interest since it'll always be ready.
- return !inputClassName.equals(MRInputLegacy.class.getName());
- }
-
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index ede2a03..d88d82a 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
-import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
index d67647b..bedd265 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.hive.llap.security;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
public class LlapDaemonPolicyProvider extends PolicyProvider {
private static final Service[] services = new Service[] {
new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname,
- LlapDaemonProtocolBlockingPB.class),
+ LlapProtocolBlockingPB.class),
new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname,
- LlapManagementProtocolBlockingPB.class)
+ LlapManagementProtocolPB.class)
};
@Override
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
index a00b631..aa8745d 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -27,7 +27,7 @@
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
index 4dca2ce..eb514f2 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -20,8 +20,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -36,8 +36,8 @@ public KerberosInfo getKerberosInfo(Class> protocol, Configuration conf) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to get KerberosInfo for " + protocol);
}
- if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)
- && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+ if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)
+ && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return null;
return new KerberosInfo() {
@Override
public Class extends Annotation> annotationType() {
@@ -62,7 +62,7 @@ public TokenInfo getTokenInfo(Class> protocol, Configuration conf) {
LOG.debug("Trying to get TokenInfo for " + protocol);
}
// Tokens cannot be used for the management protocol (for now).
- if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+ if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
return new TokenInfo() {
@Override
public Class extends Annotation> annotationType() {
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 44c958d..1cef218 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -27,10 +27,11 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
+import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
import org.junit.Test;
public class TestLlapDaemonProtocolServerImpl {
@@ -42,8 +43,8 @@ public void test() throws ServiceException, IOException {
int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
ContainerRunner containerRunnerMock = mock(ContainerRunner.class);
- LlapDaemonProtocolServerImpl server =
- new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock,
+ LlapProtocolServerImpl server =
+ new LlapProtocolServerImpl(numHandlers, containerRunnerMock,
new AtomicReference(), new AtomicReference(),
rpcPort, rpcPort + 1);
when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
@@ -56,8 +57,8 @@ public void test() throws ServiceException, IOException {
server.start();
InetSocketAddress serverAddr = server.getBindAddress();
- LlapDaemonProtocolBlockingPB client =
- new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(),
+ LlapProtocolBlockingPB client =
+ new LlapProtocolClientImpl(new Configuration(), serverAddr.getHostName(),
serverAddr.getPort(), null, null);
SubmitWorkResponseProto responseProto = client.submitWork(null,
SubmitWorkRequestProto.newBuilder()
diff --git llap-tez/pom.xml llap-tez/pom.xml
new file mode 100644
index 0000000..ce020da
--- /dev/null
+++ llap-tez/pom.xml
@@ -0,0 +1,200 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 2.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-llap-tez
+ jar
+ Hive Llap Tez
+
+
+ ..
+
+
+
+
+
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+
+
+ org.apache.hive
+ hive-llap-client
+ ${project.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+ org.apache.tez
+ tez-dag
+ ${tez.version}
+ true
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+
+ org.apache.hive
+ hive-llap-common
+ ${project.version}
+ tests
+ test
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ tests
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+
+
+ ${basedir}/src/java
+ ${basedir}/src/test
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/gen/protobuf/gen-java
+ src/gen/thrift/gen-javabean
+
+
+
+
+
+
+
+
+
diff --git llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
similarity index 97%
rename from llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
index 89b7198..a314391 100644
--- llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
+++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.tez.dag.app.rm;
+package org.apache.hadoop.hive.llap.tezplugins;
import java.util.concurrent.atomic.AtomicLong;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
similarity index 97%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index eb6384f..76d095a 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -48,6 +48,8 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -95,7 +97,7 @@
private final SourceStateTracker sourceStateTracker;
private final Set nodesForQuery = new HashSet<>();
- private LlapDaemonProtocolClientProxy communicator;
+ private LlapProtocolClientProxy communicator;
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
private final Token token;
@@ -145,7 +147,7 @@ public void initialize() throws Exception {
super.initialize();
Configuration conf = getConf();
int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
- this.communicator = new LlapDaemonProtocolClientProxy(numThreads, conf, token);
+ this.communicator = new LlapProtocolClientProxy(numThreads, conf, token);
this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
@@ -270,7 +272,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
getContext()
.taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.sendSubmitWork(requestProto, host, port,
- new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
+ new LlapProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(SubmitWorkResponseProto response) {
if (response.hasSubmissionState()) {
@@ -356,7 +358,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto)
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
- new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
+ new LlapProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(TerminateFragmentResponseProto response) {
}
@@ -385,7 +387,7 @@ public void dagComplete(final int dagIdentifier) {
for (final LlapNodeId llapNodeId : nodesForQuery) {
LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId);
communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(),
- new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
+ new LlapProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
}
@@ -411,7 +413,7 @@ public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
public void sendStateUpdate(final String host, final int port,
final SourceStateUpdatedRequestProto request) {
communicator.sendSourceStateUpdate(request, host, port,
- new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
+ new LlapProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(SourceStateUpdatedResponseProto response) {
}
diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
similarity index 99%
rename from llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 6beb4f8..3bca0da 100644
--- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.tez.dag.app.rm;
+package org.apache.hadoop.hive.llap.tezplugins;
import java.io.IOException;
import java.util.Arrays;
diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
new file mode 100644
index 0000000..2c3e53c
--- /dev/null
+++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
+
+@InterfaceAudience.Private
+public class LlapTezUtils {
+ public static boolean isSourceOfInterest(String inputClassName) {
+ // MRInput is not of interest since it'll always be ready.
+ return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals(
+ MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName()));
+ }
+}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
similarity index 94%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index fded9bf..d8f7574 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -25,18 +25,16 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
-import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.runtime.api.impl.InputSpec;
public class SourceStateTracker {
@@ -258,7 +256,7 @@ private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) {
if (inputSpecList != null) {
boolean alreadyFound = false;
for (InputSpec inputSpec : inputSpecList) {
- if (isSourceOfInterest(inputSpec)) {
+ if (LlapTezUtils.isSourceOfInterest(inputSpec.getInputDescriptor().getClassName())) {
if (!alreadyFound) {
alreadyFound = true;
sourcesOfInterest = new LinkedList<>();
@@ -279,12 +277,7 @@ private void maybeRegisterForVertexUpdates(String sourceName) {
}
}
- private boolean isSourceOfInterest(InputSpec inputSpec) {
- String inputClassName = inputSpec.getInputDescriptor().getClassName();
- // MRInput is not of interest since it'll always be ready.
- return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals(
- MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName()));
- }
+
void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
similarity index 100%
rename from llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
rename to llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
similarity index 99%
rename from llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
rename to llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 4c1cbb3..36d8ffd 100644
--- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-package org.apache.tez.dag.app.rm;
+package org.apache.hadoop.hive.llap.tezplugins;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -35,8 +35,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
+import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -47,7 +47,6 @@
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.ControlledClock;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Test;
diff --git pom.xml pom.xml
index 802d3d4..8c2257f 100644
--- pom.xml
+++ pom.xml
@@ -49,8 +49,10 @@
serde
service-rpc
service
- llap-server
+ llap-common
llap-client
+ llap-tez
+ llap-server
shims
spark-client
storage-api
diff --git ql/pom.xml ql/pom.xml
index f19a225..330e449 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -75,6 +75,11 @@
org.apache.hive
+ hive-llap-tez
+ ${project.version}
+
+
+ org.apache.hive
hive-shims
${project.version}
@@ -603,6 +608,55 @@
+ org.apache.tez
+ tez-dag
+ ${tez.version}
+ true
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
org.apache.spark
spark-core_${scala.binary.version}
${spark.version}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 0bc6e2a..6a7d035 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -20,8 +20,6 @@
import java.util.Collection;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -44,7 +42,6 @@
import javax.security.auth.login.LoginException;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.fs.FileStatus;
@@ -53,9 +50,14 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -88,9 +90,9 @@
private static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName());
private static final String TEZ_DIR = "_tez_session_dir";
public static final String LLAP_SERVICE = "LLAP";
- private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService";
- private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher";
- private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator";
+ private static final String LLAP_SCHEDULER = LlapTaskSchedulerService.class.getName();
+ private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName();
+ private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName();
private HiveConf conf;
private Path tezScratchDir;
@@ -251,27 +253,11 @@ protected void openInternal(final HiveConf conf, Collection additionalFi
// add configs for llap-daemon-site.xml + localize llap jars
// they cannot be referred to directly as it would be a circular depedency
conf.addResource("llap-daemon-site.xml");
- try {
- final File daemonJar =
- new File(Utilities.jarFinderGetJar(Class
- .forName("org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat")));
- final LocalResource daemonLr =
- createJarLocalResource(daemonJar.toURI().toURL().toExternalForm());
- commonLocalResources.put(utils.getBaseName(daemonLr), daemonLr);
- } catch (ClassNotFoundException ce) {
- throw new IOException("Cannot find LlapInputFormat in the classpath", ce);
- }
- try {
- final File registryJar =
- new File(Utilities.jarFinderGetJar(Class
- .forName("org.apache.hadoop.registry.client.api.RegistryOperations")));
- final LocalResource registryLr =
- createJarLocalResource(registryJar.toURI().toURL().toExternalForm());
- commonLocalResources.put(utils.getBaseName(registryLr), registryLr);
- } catch (ClassNotFoundException ce) {
- throw new IOException("Cannot find Hadoop Registry in the classpath", ce);
- }
+ addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
+ addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources);
+ addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources);
+ addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources);
}
// Create environment for AM.
@@ -587,6 +573,26 @@ private LocalResource createJarLocalResource(String localJarPath)
return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
}
+ private void addJarLRByClassName(String className, final Map lrMap) throws
+ IOException, LoginException {
+ Class> clazz;
+ try {
+ clazz = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find " + className + " in classpath", e);
+ }
+ addJarLRByClass(clazz, lrMap);
+ }
+
+ private void addJarLRByClass(Class> clazz, final Map lrMap) throws IOException,
+ LoginException {
+ final File jar =
+ new File(Utilities.jarFinderGetJar(clazz));
+ final LocalResource jarLr =
+ createJarLocalResource(jar.toURI().toURL().toExternalForm());
+ lrMap.put(utils.getBaseName(jarLr), jarLr);
+ }
+
private String getSha(Path localFile) throws IOException, IllegalArgumentException {
InputStream is = null;
try {