diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e50ff997a9..afc19d26d6 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4440,6 +4440,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "preferring one of the locations provided by the split itself. If there is no llap daemon " + "running on any of those locations (or on the cloud), fall back to a cache affinity to" + " an LLAP node. This is effective only if hive.execution.mode is llap."), + LLAP_CLIENT_CONSISTENT_SPLITS_NUMBER("hive.llap.client.consistent.splits.number", 1, + "The number of the preferred locations to generate if hive.llap.client.consistent.splits\n" + + "is set. If multiple locations are generated and the first node is unavailable then the\n" + + "scheduler will not wait hive.llap.task.scheduler.locality.delay before assigning the\n" + + "task to the next preferred LLAP node.\n" + + "This is effective only if hive.execution.mode is llap."), LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true, "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" + "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 46007559cd..8988dcf52c 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -946,6 +946,161 @@ public void testForcedLocalityUnknownHost() throws IOException, InterruptedExcep } } + @Test(timeout = 10000) + public void testMultipleLocalityRequest() throws Exception { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1, HOST2, HOST3}; + String[] hostsTarget = new String[]{HOST1, HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(200000L, hostsKnown, 1, 0, 10000L, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + try { + TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie1 = "cookie1"; + + TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie2 = "cookie2"; + + TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie3 = "cookie3"; + + TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie4 = "cookie4"; + + TezTaskAttemptID task5 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie5 = "cookie5"; + + tsWrapper.controlScheduler(true); + + // Get requested host + tsWrapper.allocateTask(task1, hostsTarget, priority1, clientCookie1); + // Get second requested host immediately because host1 is full + tsWrapper.allocateTask(task2, hostsTarget, priority1, clientCookie2); + + tsWrapper.awaitTotalTaskAllocations(2); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + // 1st task requested host1, got host1 + assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + // 2nd task requested host1, got host1 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Get random (only one remaining) host after locality delay because requested hosts are full + tsWrapper.allocateTask(task3, hostsTarget, priority1, clientCookie3); + + // Still only 2 allocations, but wait at least 1 scheduling run + tsWrapper.awaitTotalTaskAllocations(2); + + clock.setTime(clock.getTime() + 2000L); // Before the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + // Move the clock forward 10000ms, and check the delayed queue + clock.setTime(clock.getTime() + 10000L); // Past the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // 3rd task allocation should happen + tsWrapper.awaitTotalTaskAllocations(3); + + // 3rd task requested host1 or host2, but got host3 after locality timeout + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + // 1st task requested host1, got host1 + assertEquals(HOST3, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Finish 2nd and 3rd tasks + tsWrapper.deallocateTask(task2, true, null); + tsWrapper.deallocateTask(task3, true, null); + + // Reject from host1 and get second requested host immediately because host1 refuses + tsWrapper.rejectExecution(task1); + tsWrapper.allocateTask(task4, hostsTarget, priority1, clientCookie4); + + tsWrapper.awaitTotalTaskAllocations(4); + + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task4, argumentCaptor.getAllValues().get(0)); + // 4th task requested host1, got host1 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Reset stuff related to locality tests + clock.setTime(Time.monotonicNow()); + delayedTaskSchedulerCallableControlled.resetShouldScheduleInformation(); + + // Reject from host2 too and get random host after locality delay because both requested hosts refuses + tsWrapper.rejectExecution(task4); + tsWrapper.allocateTask(task5, hostsTarget, priority1, clientCookie5); + + // Still only 4 allocations, but wait at least 1 scheduling run + tsWrapper.awaitTotalTaskAllocations(4); + + clock.setTime(clock.getTime() + 2000L); // Before the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + // Move the clock forward 10000ms, and check the delayed queue + clock.setTime(clock.getTime() + 10000L); // Past the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + tsWrapper.awaitTotalTaskAllocations(5); + + // 5th task requested host1 or host2, but got host3 after locality timeout + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task5, argumentCaptor.getAllValues().get(0)); + // 1st task requested host1, got host1 + assertEquals(HOST3, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + + } finally { + tsWrapper.shutdown(); + } + } + @Test(timeout = 10000) public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1); @@ -2255,7 +2410,7 @@ protected TezTaskAttemptID getTaskAttemptId(Object task) { @Override protected void schedulePendingTasks() throws InterruptedException { - LOG.info("Attempted schedulPendingTasks"); + LOG.info("Attempted schedulePendingTasks"); testLock.lock(); try { if (controlScheduling.get()) { @@ -2286,6 +2441,7 @@ void forTestSignalSchedulingRun() throws InterruptedException { testLock.lock(); try { schedulingTriggered = true; + schedulingComplete = false; triggerSchedulingCondition.signal(); } finally { testLock.unlock(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index 5224429e9e..d7bc2b2c8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -52,13 +52,19 @@ private final List locations; private final Set locationSet; + private final int numberOfLocations; - public HostAffinitySplitLocationProvider(List knownLocations) { + public HostAffinitySplitLocationProvider(List knownLocations, int numberOfLocations) { Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(), HostAffinitySplitLocationProvider.class.getName() + " needs at least 1 location to function"); + Preconditions.checkArgument(numberOfLocations >= 0, + HostAffinitySplitLocationProvider.class.getName() + + " needs numberOfLocations at least set to 0. It is set to [" + + numberOfLocations + "] now."); this.locations = knownLocations; this.locationSet = new HashSet(knownLocations); + this.numberOfLocations = numberOfLocations; } @Override @@ -72,11 +78,17 @@ public HostAffinitySplitLocationProvider(List knownLocations) { FileSplit fsplit = (FileSplit) split; String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + fsplit.getLength(); - List preferredLocations = preferLocations(fsplit); - String location = - preferredLocations.get(determineLocation(preferredLocations, fsplit.getPath().toString(), - fsplit.getStart(), splitDesc)); - return (location != null) ? new String[] { location } : null; + List preferredLocations = new ArrayList<>(preferLocations(fsplit)); + List finalLocations = new ArrayList<>(numberOfLocations); + // Generate new preferred locations until we need more, or we do not have any preferred + // location left + while (finalLocations.size() < numberOfLocations && preferredLocations.size() > 0) { + String nextLocation = preferredLocations.get(determineLocation(preferredLocations, + fsplit.getPath().toString(), fsplit.getStart(), splitDesc)); + finalLocations.add(nextLocation); + preferredLocations.remove(nextLocation); + } + return finalLocations.toArray(new String[0]); } private List preferLocations(FileSplit fsplit) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 1b7321bb63..b58968747a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -49,6 +49,8 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, SplitLocationProvider splitLocationProvider; LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { + int numberOfLocations = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS_NUMBER); LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf); LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); @@ -64,7 +66,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, } locations.add(serviceInstance.getHost()); } - splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + splitLocationProvider = new HostAffinitySplitLocationProvider(locations, numberOfLocations); } else { splitLocationProvider = new SplitLocationProvider() { @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index f37a10cf2d..aed5826bdc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -63,7 +63,7 @@ @Test (timeout = 5000) public void testNonFileSplits() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, 1); InputSplit inputSplit1 = createMockInputSplit(new String[] {locations.get(0), locations.get(1)}); InputSplit inputSplit2 = createMockInputSplit(new String[] {locations.get(2), locations.get(3)}); @@ -74,7 +74,7 @@ public void testNonFileSplits() throws IOException { @Test (timeout = 5000) public void testOrcSplitsBasic() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, 1); InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations.get(0), locations.get(1)}); InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations.get(2), locations.get(3)}); @@ -110,7 +110,7 @@ public void testConsistentHashing() throws IOException { movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE; for (int locs = MIN_LOC_COUNT; locs <= locations.size(); ++locs) { List partLoc = locations.subList(0, locs); - HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc); + HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc, 1); int moved = 0, newLoc = 0; String newNode = partLoc.get(locs - 1); for (int splitIx = 0; splitIx < splits.length; ++splitIx) { @@ -257,7 +257,7 @@ private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, Stri @Test (timeout = 5000) public void testOrcSplitsLocationAffinity() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, 1); // Same file, offset, different lengths InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)}); @@ -302,7 +302,7 @@ public void testOrcSplitsLocationAffinity() throws IOException { @Test (timeout = 90000000) public void testDFSLocalityAwareAffinity() throws IOException { List someLocations = locations.subList(0, 2); // 0,1 locations - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(someLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(someLocations, 1); // Different base localities InputSplit os1 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)}); // 0 or 1 @@ -340,7 +340,63 @@ public void testDFSLocalityAwareAffinity() throws IOException { assertArrayEquals(retLoc4, againLoc4); } + @Test (timeout = 100000000) + public void testMultipleHostAffinity() throws IOException { + List someLocations = locations.subList(0, 3); // 0,1,2 locations + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(someLocations, 2); + // Different base localities + InputSplit os1 = createMockFileSplit(true, "path1", 10, 20, new String[] {locations.get(0), locations.get(1), locations.get(2)}); + InputSplit os2 = createMockFileSplit(true, "path1", 10, 20, new String[] {locations.get(0), locations.get(1), locations.get(3)}); + InputSplit os3 = createMockFileSplit(true, "path1", 20, 30, new String[] {locations.get(0), locations.get(1)}); + InputSplit os4 = createMockFileSplit(true, "path1", 30, 40, new String[] {locations.get(0), locations.get(2)}); + InputSplit os5 = createMockFileSplit(true, "path1", 40, 50, new String[] {locations.get(1), locations.get(3)}); + InputSplit os6 = createMockFileSplit(true, "path1", 50, 60, new String[] {locations.get(3)}); + + String[] retLoc1 = locationProvider.getLocations(os1); + String[] retLoc2 = locationProvider.getLocations(os2); + String[] retLoc3 = locationProvider.getLocations(os3); + String[] retLoc4 = locationProvider.getLocations(os4); + String[] retLoc5 = locationProvider.getLocations(os5); + String[] retLoc6 = locationProvider.getLocations(os6); + + assertEquals(2, retLoc1.length); + assertTrue(someLocations.contains(retLoc1[0])); + assertTrue(someLocations.contains(retLoc1[1])); + + assertEquals(2, retLoc2.length); + assertTrue(someLocations.contains(retLoc2[0])); + assertTrue(someLocations.contains(retLoc2[1])); + + assertEquals(2, retLoc3.length); + assertTrue(someLocations.contains(retLoc3[0])); + assertTrue(someLocations.contains(retLoc3[1])); + + assertEquals(2, retLoc4.length); + assertTrue(someLocations.contains(retLoc4[0])); + assertTrue(someLocations.contains(retLoc4[1])); + + assertEquals(1, retLoc5.length); + assertTrue(someLocations.contains(retLoc5[0])); + assertEquals(someLocations.get(1), retLoc5[0]); // is always 1 + + // When there is no matching location then we choose 2 from all of the nodes + assertEquals(2, retLoc6.length); + + String[] againLoc1 = locationProvider.getLocations(os1); + String[] againLoc2 = locationProvider.getLocations(os2); + String[] againLoc3 = locationProvider.getLocations(os3); + String[] againLoc4 = locationProvider.getLocations(os4); + String[] againLoc5 = locationProvider.getLocations(os5); + String[] againLoc6 = locationProvider.getLocations(os6); + + assertArrayEquals(retLoc1, againLoc1); + assertArrayEquals(retLoc2, againLoc2); + assertArrayEquals(retLoc3, againLoc3); + assertArrayEquals(retLoc4, againLoc4); + assertArrayEquals(retLoc5, againLoc5); + assertArrayEquals(retLoc6, againLoc6); + } private InputSplit createMockInputSplit(String[] locations) throws IOException { InputSplit inputSplit = mock(InputSplit.class);