diff --git llap-client/pom.xml llap-client/pom.xml index f6a5629..50c06a4 100644 --- llap-client/pom.xml +++ llap-client/pom.xml @@ -32,7 +32,7 @@ - + org.apache.hive @@ -41,41 +41,26 @@ org.apache.hive - hive-serde + hive-llap-common ${project.version} - commons-codec - commons-codec - ${commons-codec.version} - - - commons-lang - commons-lang - ${commons-lang.version} - - - org.apache.thrift - libthrift - ${libthrift.version} - - org.apache.hadoop hadoop-common ${hadoop.version} true - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop hadoop-mapreduce-client-core @@ -96,15 +81,15 @@ test - org.mockito - mockito-all - ${mockito-all.version} + org.apache.commons + commons-lang3 + ${commons-lang3.version} test - com.sun.jersey - jersey-servlet - ${jersey.version} + org.mockito + mockito-all + ${mockito-all.version} test @@ -113,32 +98,18 @@ ${hadoop.version} tests test - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.version} - test - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.version} - tests - test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + - ${basedir}/src/java ${basedir}/src/test diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java similarity index 94% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java rename to llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java index 2884e40..5b0674a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.tezplugins; +package org.apache.hadoop.hive.llap.tez; import javax.net.SocketFactory; @@ -47,8 +47,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapNodeId; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; @@ -68,11 +68,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LlapDaemonProtocolClientProxy extends AbstractService { +public class LlapProtocolClientProxy extends AbstractService { - private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolClientProxy.class); + private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolClientProxy.class); - private final ConcurrentMap hostProxies; + private final ConcurrentMap hostProxies; private final RequestManager requestManager; private final RetryPolicy retryPolicy; @@ -82,9 +82,9 @@ private volatile ListenableFuture requestManagerFuture; private final Token llapToken; - public LlapDaemonProtocolClientProxy( + public LlapProtocolClientProxy( int numThreads, Configuration conf, Token llapToken) { - super(LlapDaemonProtocolClientProxy.class.getSimpleName()); + super(LlapProtocolClientProxy.class.getSimpleName()); this.hostProxies = new ConcurrentHashMap<>(); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.llapToken = llapToken; @@ -466,13 +466,13 @@ public TerminateFragmentResponseProto call() throws Exception { void indicateError(Throwable t); } - private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) { + private LlapProtocolBlockingPB getProxy(final LlapNodeId nodeId) { String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort()); - LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId); + LlapProtocolBlockingPB proxy = hostProxies.get(hostId); if (proxy == null) { if (llapToken == null) { - proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), + proxy = new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(), retryPolicy, socketFactory); } else { UserGroupInformation ugi; @@ -485,16 +485,16 @@ private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) { SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost( nodeId.getHostname(), nodeId.getPort())); ugi.addToken(nodeToken); - proxy = ugi.doAs(new PrivilegedAction() { + proxy = ugi.doAs(new PrivilegedAction() { @Override - public LlapDaemonProtocolBlockingPB run() { - return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), + public LlapProtocolBlockingPB run() { + return new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(), retryPolicy, socketFactory); } }); } - LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); + LlapProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); if (proxyOld != null) { // TODO Shutdown the new proxy. proxy = proxyOld; diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java similarity index 78% rename from llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java rename to llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java index a6af8c2..850b67a 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java +++ llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.tezplugins; +package org.apache.hadoop.hive.llap.tez; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -26,6 +26,7 @@ import com.google.protobuf.Message; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hive.llap.LlapNodeId; +import org.junit.Assert; import org.junit.Test; public class TestLlapDaemonProtocolClientProxy { @@ -38,8 +39,8 @@ public void testMultipleNodes() { LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025); Message mockMessage = mock(Message.class); - LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( - LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class); + LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( + LlapProtocolClientProxy.ExecuteRequestCallback.class); // Request two messages requestManager.queueRequest( @@ -52,8 +53,8 @@ public void testMultipleNodes() { assertEquals(2, requestManager.numSubmissionsCounters); assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue()); + Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue()); assertEquals(0, requestManager.currentLoopSkippedRequests.size()); assertEquals(0, requestManager.currentLoopSkippedRequests.size()); assertEquals(0, requestManager.currentLoopDisabledNodes.size()); @@ -66,8 +67,8 @@ public void testSingleInvocationPerNode() { LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); Message mockMessage = mock(Message.class); - LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( - LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class); + LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( + LlapProtocolClientProxy.ExecuteRequestCallback.class); // First request for host. requestManager.queueRequest( @@ -75,7 +76,7 @@ public void testSingleInvocationPerNode() { requestManager.process(); assertEquals(1, requestManager.numSubmissionsCounters); assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); assertEquals(0, requestManager.currentLoopSkippedRequests.size()); // Second request for host. Single invocation since the last has not completed. @@ -84,7 +85,7 @@ public void testSingleInvocationPerNode() { requestManager.process(); assertEquals(1, requestManager.numSubmissionsCounters); assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + Assert.assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); assertEquals(1, requestManager.currentLoopSkippedRequests.size()); assertEquals(1, requestManager.currentLoopDisabledNodes.size()); assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1)); @@ -94,14 +95,14 @@ public void testSingleInvocationPerNode() { requestManager.process(); assertEquals(2, requestManager.numSubmissionsCounters); assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + Assert.assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); assertEquals(0, requestManager.currentLoopSkippedRequests.size()); assertEquals(0, requestManager.currentLoopDisabledNodes.size()); assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1)); } - static class RequestManagerForTest extends LlapDaemonProtocolClientProxy.RequestManager { + static class RequestManagerForTest extends LlapProtocolClientProxy.RequestManager { int numSubmissionsCounters = 0; private Map numInvocationsPerNode = new HashMap<>(); @@ -110,7 +111,7 @@ public RequestManagerForTest(int numThreads) { super(numThreads); } - protected void submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) { + protected void submitToExecutor(LlapProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) { numSubmissionsCounters++; MutableInt nodeCount = numInvocationsPerNode.get(nodeId); if (nodeCount == null) { @@ -127,10 +128,10 @@ void reset() { } - static class CallableRequestForTest extends LlapDaemonProtocolClientProxy.CallableRequest { + static class CallableRequestForTest extends LlapProtocolClientProxy.CallableRequest { protected CallableRequestForTest(LlapNodeId nodeId, Message message, - LlapDaemonProtocolClientProxy.ExecuteRequestCallback callback) { + LlapProtocolClientProxy.ExecuteRequestCallback callback) { super(nodeId, message, callback); } diff --git llap-common/pom.xml llap-common/pom.xml new file mode 100644 index 0000000..5343479 --- /dev/null +++ llap-common/pom.xml @@ -0,0 +1,235 @@ + + + + 4.0.0 + + org.apache.hive + hive + 2.1.0-SNAPSHOT + ../pom.xml + + + hive-llap-common + jar + Hive Llap Common + + + .. + + + + + + + org.apache.hive + hive-common + ${project.version} + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-api + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-common + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-runtime-internals + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + tests + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-all + ${mockito-all.version} + test + + + + + + protobuf + + + + org.apache.maven.plugins + maven-antrun-plugin + + + generate-protobuf-sources + generate-sources + + + + + Building LLAP Protobuf + + + + + + + + + + run + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + src/main/resources + + *.py + *.pyc + + false + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/gen/protobuf/gen-java + src/gen/thrift/gen-javabean + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java similarity index 100% rename from llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java rename to llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java similarity index 81% rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java index e293a95..cd11bdb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.daemon.impl; +package org.apache.hadoop.hive.llap.impl; import javax.annotation.Nullable; import javax.net.SocketFactory; @@ -27,18 +27,18 @@ import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; import org.apache.hadoop.security.UserGroupInformation; -public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB { +public class LlapManagementProtocolClientImpl implements LlapManagementProtocolPB { private final Configuration conf; private final InetSocketAddress serverAddr; private final RetryPolicy retryPolicy; private final SocketFactory socketFactory; - LlapManagementProtocolBlockingPB proxy; + LlapManagementProtocolPB proxy; public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port, @@ -54,17 +54,17 @@ public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int } } - public LlapManagementProtocolBlockingPB getProxy() throws IOException { + public LlapManagementProtocolPB getProxy() throws IOException { if (proxy == null) { proxy = createProxy(); } return proxy; } - public LlapManagementProtocolBlockingPB createProxy() throws IOException { - RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class); - ProtocolProxy proxy = - RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr, + public LlapManagementProtocolPB createProxy() throws IOException { + RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, ProtobufRpcEngine.class); + ProtocolProxy proxy = + RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, retryPolicy); return proxy.getProxy(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java similarity index 83% rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java index 9c7d2e2..9234b7d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapProtocolClientImpl.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.daemon.impl; +package org.apache.hadoop.hive.llap.impl; import javax.annotation.Nullable; import javax.net.SocketFactory; @@ -35,24 +35,24 @@ import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; import org.apache.hadoop.security.UserGroupInformation; // TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment. -public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingPB { +public class LlapProtocolClientImpl implements LlapProtocolBlockingPB { private final Configuration conf; private final InetSocketAddress serverAddr; private final RetryPolicy retryPolicy; private final SocketFactory socketFactory; - LlapDaemonProtocolBlockingPB proxy; + LlapProtocolBlockingPB proxy; - public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port, - @Nullable RetryPolicy retryPolicy, - @Nullable SocketFactory socketFactory) { + public LlapProtocolClientImpl(Configuration conf, String hostname, int port, + @Nullable RetryPolicy retryPolicy, + @Nullable SocketFactory socketFactory) { this.conf = conf; this.serverAddr = NetUtils.createSocketAddr(hostname, port); this.retryPolicy = retryPolicy; @@ -107,17 +107,17 @@ public TerminateFragmentResponseProto terminateFragment( } } - public LlapDaemonProtocolBlockingPB getProxy() throws IOException { + public LlapProtocolBlockingPB getProxy() throws IOException { if (proxy == null) { proxy = createProxy(); } return proxy; } - public LlapDaemonProtocolBlockingPB createProxy() throws IOException { - RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class); - ProtocolProxy proxy = - RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, + public LlapProtocolBlockingPB createProxy() throws IOException { + RPC.setProtocolEngine(conf, LlapProtocolBlockingPB.class, ProtobufRpcEngine.class); + ProtocolProxy proxy = + RPC.getProtocolProxy(LlapProtocolBlockingPB.class, 0, serverAddr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, retryPolicy); return proxy.getProxy(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java similarity index 70% rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java index 4efadac..ff215d4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java @@ -11,14 +11,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.llap.daemon; +package org.apache.hadoop.hive.llap.protocol; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.security.KerberosInfo; -@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1) +@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB", protocolVersion = 1) @KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME) -public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface { +@InterfaceAudience.Private +public interface LlapManagementProtocolPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface { } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java similarity index 73% rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java index 4c09941..2bd56ca 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapProtocolBlockingPB.java @@ -12,8 +12,9 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.daemon; +package org.apache.hadoop.hive.llap.protocol; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.TokenInfo; @@ -21,8 +22,9 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.security.LlapTokenSelector; -@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1) +@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB", protocolVersion = 1) @KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME) @TokenInfo(LlapTokenSelector.class) -public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface { +@InterfaceAudience.Private +public interface LlapProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface { } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java diff --git llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java similarity index 100% rename from llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java diff --git llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java similarity index 100% rename from llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java similarity index 99% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java index f61d62f..a5c3631 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.llap.tezplugins; +package org.apache.hadoop.hive.llap.tez; import java.io.IOException; import java.util.ArrayList; diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-common/src/protobuf/LlapDaemonProtocol.proto similarity index 100% rename from llap-server/src/protobuf/LlapDaemonProtocol.proto rename to llap-common/src/protobuf/LlapDaemonProtocol.proto diff --git llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java new file mode 100644 index 0000000..fec1c35 --- /dev/null +++ llap-common/src/test/org/apache/hadoop/hive/llap/testhelpers/ControlledClock.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.testhelpers; + +import org.apache.hadoop.yarn.util.Clock; + +public class ControlledClock implements Clock { + private long time = -1; + private final Clock actualClock; + public ControlledClock(Clock actualClock) { + this.actualClock = actualClock; + } + public synchronized void setTime(long time) { + this.time = time; + } + public synchronized void reset() { + time = -1; + } + + @Override + public synchronized long getTime() { + if (time != -1) { + return time; + } + return actualClock.getTime(); + } + +} diff --git llap-server/pom.xml llap-server/pom.xml index 916fb5c..c81bdb2 100644 --- llap-server/pom.xml +++ llap-server/pom.xml @@ -32,7 +32,7 @@ - + org.apache.hive @@ -46,11 +46,21 @@ org.apache.hive + hive-llap-common + ${project.version} + + + org.apache.hive hive-llap-client ${project.version} org.apache.hive + hive-llap-tez + ${project.version} + + + org.apache.hive hive-shims ${project.version} @@ -90,77 +100,66 @@ hadoop-common ${hadoop.version} true - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} true - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - - - org.apache.tez - tez-runtime-internals - ${tez.version} - true - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - - - org.apache.tez - tez-runtime-library - ${tez.version} - true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + org.apache.tez - tez-mapreduce + tez-runtime-internals ${tez.version} true - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + org.apache.tez - tez-dag + tez-runtime-library ${tez.version} true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop @@ -168,17 +167,17 @@ ${hadoop.version} tests test - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - - - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop hadoop-hdfs @@ -225,15 +224,15 @@ org.apache.hbase hbase-server ${hbase.version} - - - org.slf4j - slf4j-log4j12 - - - commmons-logging - commons-logging - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + @@ -248,54 +247,6 @@ - - - protobuf - - - - org.apache.maven.plugins - maven-antrun-plugin - - - generate-protobuf-sources - generate-sources - - - - - Building LLAP-Server Protobuf - - - - - - - - - - run - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - - - - ${basedir}/src/java ${basedir}/src/test diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 544af09..0399798 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -25,6 +25,8 @@ import java.util.Map.Entry; import java.util.Collection; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -217,7 +219,13 @@ private void run(String[] args) throws Exception { CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), libDir.toString(), true); lfs.delete(new Path(libDir, "tez.tar.gz"), false); + // llap-common + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapDaemonProtocolProtos.class)), libDir); + // llap-tez + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapTezUtils.class)), libDir); + // llap-server lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir); + // hive-exec lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir); // copy default aux classes (json/hbase) diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7d7fa00..94b3b41 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -19,7 +19,6 @@ import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryType; import java.net.InetSocketAddress; -import java.net.URLClassLoader; import java.util.Arrays; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -48,7 +47,6 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; @@ -67,7 +65,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); private final Configuration shuffleHandlerConf; - private final LlapDaemonProtocolServerImpl server; + private final LlapProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; private final AMReporter amReporter; private final LlapRegistryService registry; @@ -166,7 +164,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf); - this.server = new LlapDaemonProtocolServerImpl( + this.server = new LlapProtocolServerImpl( numHandlers, this, srvAddress, mngAddress, srvPort, mngPort); ClassLoader executorClassLoader = null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java similarity index 91% rename from llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java rename to llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index 45ca906..c386d77 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -55,14 +55,14 @@ import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider; -public class LlapDaemonProtocolServerImpl extends AbstractService - implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB { +public class LlapProtocolServerImpl extends AbstractService + implements LlapProtocolBlockingPB, LlapManagementProtocolPB { - private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class); private final int numHandlers; private final ContainerRunner containerRunner; @@ -71,12 +71,12 @@ private final AtomicReference srvAddress, mngAddress; private SecretManager zkSecretManager; - public LlapDaemonProtocolServerImpl(int numHandlers, - ContainerRunner containerRunner, - AtomicReference srvAddress, - AtomicReference mngAddress, - int srvPort, - int mngPort) { + public LlapProtocolServerImpl(int numHandlers, + ContainerRunner containerRunner, + AtomicReference srvAddress, + AtomicReference mngAddress, + int srvPort, + int mngPort) { super("LlapDaemonProtocolServerImpl"); this.numHandlers = numHandlers; this.containerRunner = containerRunner; @@ -84,7 +84,7 @@ public LlapDaemonProtocolServerImpl(int numHandlers, this.srvPort = srvPort; this.mngAddress = mngAddress; this.mngPort = mngPort; - LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() + + LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() + " with port configured to: " + srvPort); } @@ -152,9 +152,9 @@ public Void run() { private void startProtocolServers( Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) { server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl, - LlapDaemonProtocolBlockingPB.class); + LlapProtocolBlockingPB.class); mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl, - LlapManagementProtocolBlockingPB.class); + LlapManagementProtocolPB.class); } private RPC.Server startProtocolServer(int srvPort, int numHandlers, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index aa065a9..480a394 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; -import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +89,7 @@ public boolean canFinish() { boolean canFinish = true; if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) { // Lookup the state in the map. LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap() .get(inputSpec.getConnectedVertexName()); @@ -129,7 +129,7 @@ public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler ha List inputSpecList = fragmentSpec.getInputSpecsList(); if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) { sourcesOfInterest.add(inputSpec.getConnectedVertexName()); } } @@ -143,13 +143,6 @@ public void unregisterForFinishableStateUpdates(FinishableStateUpdateHandler han queryInfo.unregisterFinishableStateUpdate(handler); } - private boolean isSourceOfInterest(IOSpecProto inputSpec) { - String inputClassName = inputSpec.getIoDescriptor().getClassName(); - // MRInput is not of interest since it'll always be ready. - return !inputClassName.equals(MRInputLegacy.class.getName()); - } - - @Override public boolean equals(Object o) { if (this == o) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index ede2a03..d88d82a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; -import org.apache.hadoop.hive.llap.tezplugins.Converters; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java index d67647b..bedd265 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java @@ -18,17 +18,17 @@ package org.apache.hadoop.hive.llap.security; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; public class LlapDaemonPolicyProvider extends PolicyProvider { private static final Service[] services = new Service[] { new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname, - LlapDaemonProtocolBlockingPB.class), + LlapProtocolBlockingPB.class), new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname, - LlapManagementProtocolBlockingPB.class) + LlapManagementProtocolPB.class) }; @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java index a00b631..aa8745d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java @@ -27,7 +27,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.ServiceInstance; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java index 4dca2ce..eb514f2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java @@ -20,8 +20,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.token.TokenIdentifier; @@ -36,8 +36,8 @@ public KerberosInfo getKerberosInfo(Class protocol, Configuration conf) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to get KerberosInfo for " + protocol); } - if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol) - && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; + if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol) + && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return null; return new KerberosInfo() { @Override public Class annotationType() { @@ -62,7 +62,7 @@ public TokenInfo getTokenInfo(Class protocol, Configuration conf) { LOG.debug("Trying to get TokenInfo for " + protocol); } // Tokens cannot be used for the management protocol (for now). - if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; + if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; return new TokenInfo() { @Override public Class annotationType() { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index 44c958d..1cef218 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -27,10 +27,11 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; +import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; import org.junit.Test; public class TestLlapDaemonProtocolServerImpl { @@ -42,8 +43,8 @@ public void test() throws ServiceException, IOException { int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); ContainerRunner containerRunnerMock = mock(ContainerRunner.class); - LlapDaemonProtocolServerImpl server = - new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock, + LlapProtocolServerImpl server = + new LlapProtocolServerImpl(numHandlers, containerRunnerMock, new AtomicReference(), new AtomicReference(), rpcPort, rpcPort + 1); when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn( @@ -56,8 +57,8 @@ public void test() throws ServiceException, IOException { server.start(); InetSocketAddress serverAddr = server.getBindAddress(); - LlapDaemonProtocolBlockingPB client = - new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(), + LlapProtocolBlockingPB client = + new LlapProtocolClientImpl(new Configuration(), serverAddr.getHostName(), serverAddr.getPort(), null, null); SubmitWorkResponseProto responseProto = client.submitWork(null, SubmitWorkRequestProto.newBuilder() diff --git llap-tez/pom.xml llap-tez/pom.xml new file mode 100644 index 0000000..ce020da --- /dev/null +++ llap-tez/pom.xml @@ -0,0 +1,200 @@ + + + + 4.0.0 + + org.apache.hive + hive + 2.1.0-SNAPSHOT + ../pom.xml + + + hive-llap-tez + jar + Hive Llap Tez + + + .. + + + + + + + org.apache.hive + hive-common + ${project.version} + + + org.apache.hive + hive-llap-client + ${project.version} + + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-api + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-runtime-internals + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-mapreduce + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-dag + ${tez.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + + org.apache.hive + hive-llap-common + ${project.version} + tests + test + + + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-all + ${mockito-all.version} + test + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + tests + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/gen/protobuf/gen-java + src/gen/thrift/gen-javabean + + + + + + + + + diff --git llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java similarity index 97% rename from llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java index 89b7198..a314391 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.tez.dag.app.rm; +package org.apache.hadoop.hive.llap.tezplugins; import java.util.concurrent.atomic.AtomicLong; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java similarity index 97% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index eb6384f..76d095a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -95,7 +97,7 @@ private final SourceStateTracker sourceStateTracker; private final Set nodesForQuery = new HashSet<>(); - private LlapDaemonProtocolClientProxy communicator; + private LlapProtocolClientProxy communicator; private long deleteDelayOnDagComplete; private final LlapTaskUmbilicalProtocol umbilical; private final Token token; @@ -145,7 +147,7 @@ public void initialize() throws Exception { super.initialize(); Configuration conf = getConf(); int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS); - this.communicator = new LlapDaemonProtocolClientProxy(numThreads, conf, token); + this.communicator = new LlapProtocolClientProxy(numThreads, conf, token); this.deleteDelayOnDagComplete = HiveConf.getTimeVar( conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); LOG.info("Running LlapTaskCommunicator with " @@ -270,7 +272,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task getContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); communicator.sendSubmitWork(requestProto, host, port, - new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { + new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override public void setResponse(SubmitWorkResponseProto response) { if (response.hasSubmissionState()) { @@ -356,7 +358,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), - new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { + new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override public void setResponse(TerminateFragmentResponseProto response) { } @@ -385,7 +387,7 @@ public void dagComplete(final int dagIdentifier) { for (final LlapNodeId llapNodeId : nodesForQuery) { LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId); communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), - new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { + new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) { } @@ -411,7 +413,7 @@ public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) { public void sendStateUpdate(final String host, final int port, final SourceStateUpdatedRequestProto request) { communicator.sendSourceStateUpdate(request, host, port, - new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { + new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override public void setResponse(SourceStateUpdatedResponseProto response) { } diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java similarity index 99% rename from llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 6beb4f8..3bca0da 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.tez.dag.app.rm; +package org.apache.hadoop.hive.llap.tezplugins; import java.io.IOException; import java.util.Arrays; diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java new file mode 100644 index 0000000..2c3e53c --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; + +@InterfaceAudience.Private +public class LlapTezUtils { + public static boolean isSourceOfInterest(String inputClassName) { + // MRInput is not of interest since it'll always be ready. + return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals( + MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java similarity index 94% rename from llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java rename to llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index fded9bf..d8f7574 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -25,18 +25,16 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; -import org.apache.hadoop.hive.llap.tezplugins.Converters; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.mapreduce.input.MRInput; -import org.apache.tez.mapreduce.input.MRInputLegacy; -import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.runtime.api.impl.InputSpec; public class SourceStateTracker { @@ -258,7 +256,7 @@ private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) { if (inputSpecList != null) { boolean alreadyFound = false; for (InputSpec inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getInputDescriptor().getClassName())) { if (!alreadyFound) { alreadyFound = true; sourcesOfInterest = new LinkedList<>(); @@ -279,12 +277,7 @@ private void maybeRegisterForVertexUpdates(String sourceName) { } } - private boolean isSourceOfInterest(InputSpec inputSpec) { - String inputClassName = inputSpec.getInputDescriptor().getClassName(); - // MRInput is not of interest since it'll always be ready. - return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals( - MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); - } + void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java similarity index 100% rename from llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java rename to llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java similarity index 99% rename from llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java rename to llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 4c1cbb3..36d8ffd 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.tez.dag.app.rm; +package org.apache.hadoop.hive.llap.tezplugins; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -35,8 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; +import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.app.ControlledClock; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Test; diff --git pom.xml pom.xml index 802d3d4..8c2257f 100644 --- pom.xml +++ pom.xml @@ -49,8 +49,10 @@ serde service-rpc service - llap-server + llap-common llap-client + llap-tez + llap-server shims spark-client storage-api diff --git ql/pom.xml ql/pom.xml index f19a225..330e449 100644 --- ql/pom.xml +++ ql/pom.xml @@ -75,6 +75,11 @@ org.apache.hive + hive-llap-tez + ${project.version} + + + org.apache.hive hive-shims ${project.version} @@ -603,6 +608,55 @@ + org.apache.tez + tez-dag + ${tez.version} + true + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-yarn-client + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + org.apache.spark spark-core_${scala.binary.version} ${spark.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 0bc6e2a..6a7d035 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -20,8 +20,6 @@ import java.util.Collection; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,7 +42,6 @@ import javax.security.auth.login.LoginException; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.fs.FileStatus; @@ -53,9 +50,14 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.security.LlapTokenProvider; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -88,9 +90,9 @@ private static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; public static final String LLAP_SERVICE = "LLAP"; - private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService"; - private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher"; - private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator"; + private static final String LLAP_SCHEDULER = LlapTaskSchedulerService.class.getName(); + private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); + private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); private HiveConf conf; private Path tezScratchDir; @@ -251,27 +253,11 @@ protected void openInternal(final HiveConf conf, Collection additionalFi // add configs for llap-daemon-site.xml + localize llap jars // they cannot be referred to directly as it would be a circular depedency conf.addResource("llap-daemon-site.xml"); - try { - final File daemonJar = - new File(Utilities.jarFinderGetJar(Class - .forName("org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat"))); - final LocalResource daemonLr = - createJarLocalResource(daemonJar.toURI().toURL().toExternalForm()); - commonLocalResources.put(utils.getBaseName(daemonLr), daemonLr); - } catch (ClassNotFoundException ce) { - throw new IOException("Cannot find LlapInputFormat in the classpath", ce); - } - try { - final File registryJar = - new File(Utilities.jarFinderGetJar(Class - .forName("org.apache.hadoop.registry.client.api.RegistryOperations"))); - final LocalResource registryLr = - createJarLocalResource(registryJar.toURI().toURL().toExternalForm()); - commonLocalResources.put(utils.getBaseName(registryLr), registryLr); - } catch (ClassNotFoundException ce) { - throw new IOException("Cannot find Hadoop Registry in the classpath", ce); - } + addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); + addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources); } // Create environment for AM. @@ -587,6 +573,26 @@ private LocalResource createJarLocalResource(String localJarPath) return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); } + private void addJarLRByClassName(String className, final Map lrMap) throws + IOException, LoginException { + Class clazz; + try { + clazz = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find " + className + " in classpath", e); + } + addJarLRByClass(clazz, lrMap); + } + + private void addJarLRByClass(Class clazz, final Map lrMap) throws IOException, + LoginException { + final File jar = + new File(Utilities.jarFinderGetJar(clazz)); + final LocalResource jarLr = + createJarLocalResource(jar.toURI().toURL().toExternalForm()); + lrMap.put(utils.getBaseName(jarLr), jarLr); + } + private String getSha(Path localFile) throws IOException, IllegalArgumentException { InputStream is = null; try {