diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 00fed15d2b..41476e7b2c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -139,7 +139,18 @@ public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) { QueryFragmentInfo fragmentInfo = new QueryFragmentInfo( this, vertexName, fragmentNumber, attemptNumber, vertexSpec, fragmentIdString); - knownFragments.add(fragmentInfo); + boolean wasUniqueFragment = knownFragments.add(fragmentInfo); + if (!wasUniqueFragment) { + // The same query fragment (including attempt number) has already been registered. + // This could potentially occur for external clients that are trying to submit the + // exact same fragment more than once (for example speculative execution of a query fragment). + // This should not occur for a non-external query fragment. + // Either way, registering the same fragment twice should be disallowed as the LLAP structures + // it will only ever have a single submission of a fragment. + String message = "Fragment " + fragmentIdString + "(isExternal=" + isExternalQuery() + + ") has already been registered."; + throw new IllegalArgumentException(message); + } return fragmentInfo; } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java index 21f732bf05..83ee1e9a09 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java @@ -28,12 +28,23 @@ private LlapDaemonTestUtils() {} public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber, + String appId, int dagId, int vId, String dagName, + int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete, + int withinDagPriority, Credentials credentials) throws IOException { + return buildSubmitProtoRequest(fragmentNumber, 0, + appId, dagId, vId, dagName, dagStartTime, attemptStartTime, + numSelfAndUpstreamTasks, numSelfAndUpstreamComplete, + withinDagPriority, credentials); + } + + public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber, + int attemptNumber, String appId, int dagId, int vId, String dagName, int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete, int withinDagPriority, Credentials credentials) throws IOException { return SubmitWorkRequestProto .newBuilder() - .setAttemptNumber(0) + .setAttemptNumber(attemptNumber) .setFragmentNumber(fragmentNumber) .setWorkSpec( LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex( diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index 8ae00b9c87..a5ce17c196 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -45,6 +45,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.mock; @@ -132,6 +133,11 @@ public void setup() throws Exception { @After public void cleanup() throws Exception { + for (Object key : ShuffleHandler.get().getRegisteredApps().keySet()) { + String appId = (String) key; + ShuffleHandler.get().unregisterDag(null, appId, dagId); + } + containerRunner.serviceStop(); queryTracker.serviceStop(); executorService.serviceStop(); @@ -155,6 +161,7 @@ public void testRegisterDag() throws Exception { .setDagIndex(dagId) .build()) .build(); + containerRunner.registerDag(request); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); @@ -177,4 +184,74 @@ public void testRegisterDag() throws Exception { Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); } + + @Test(timeout = 10000) + public void testSubmitSameFragment() throws Exception { + Credentials credentials = new Credentials(); + Token sessionToken = new Token<>( + "identifier".getBytes(), "testPassword".getBytes(), new Text("kind"), new Text("service")); + TokenCache.setSessionToken(sessionToken, credentials); + + RegisterDagRequestProto request = RegisterDagRequestProto.newBuilder() + .setUser(testUser) + .setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials))) + .setQueryIdentifier( + QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId) + .setDagIndex(dagId) + .build()) + .build(); + containerRunner.registerDag(request); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0); + + int fragNum = 1; + int attemptNum = 0; + SubmitWorkRequestProto sRequest1 = + LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId, + dagId, vId, "dagName", 0, 0, + 0, 0, 1, + credentials); + + containerRunner.submitWork(sRequest1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + + // submitWork() was successful, should show up as an active fragment. + Assert.assertEquals(1, containerRunner.getExecutorStatus().size()); + boolean caughtException = false; + + // Try exact same fragment ID + attempt number - should fail. + try { + SubmitWorkRequestProto sRequest2 = + LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId, + dagId, vId, "dagName", 0, 0, + 0, 0, 1, + credentials); + + containerRunner.submitWork(sRequest2); + } catch (IllegalArgumentException err) { + err.printStackTrace(); + caughtException = true; + } + Assert.assertTrue(caughtException); + // request failed so should still only have the 1 fragment + Assert.assertEquals(1, containerRunner.getExecutorStatus().size()); + + // Try same fragment ID with different attempt number - should work. + attemptNum = 1; + SubmitWorkRequestProto sRequest3 = + LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId, + dagId, vId, "dagName", 0, 0, + 0, 0, 1, + credentials); + + containerRunner.submitWork(sRequest3); + + // Should now have 2 fragments registered. + Assert.assertEquals(2, containerRunner.getExecutorStatus().size()); + } }