diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e50ff997a9..178f26431c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4440,6 +4440,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "preferring one of the locations provided by the split itself. If there is no llap daemon " + "running on any of those locations (or on the cloud), fall back to a cache affinity to" + " an LLAP node. This is effective only if hive.execution.mode is llap."), + LLAP_CLIENT_CONSISTENT_SPLITS_PREFER_LOCALITY("hive.llap.client.consistent.splits.prefer.locality", true, + "If this option is set to true then the HDFS locality will be considered when generating\n" + + "split targets. This could mean faster execution when cache miss on the other hand can\n" + + "cause imbalance in LlapDaemon usage if the data locality is skewed.\n" + + "This is effective only if hive.execution.mode is llap and hive.llap.client.consistent.splits is true."), + LLAP_CLIENT_CONSISTENT_SPLITS_NUMBER("hive.llap.client.consistent.splits.number", 1, + "The number of the locations to generate if hive.llap.client.consistent.splits is set.\b" + + "If multiple locations are generated and the first node is unavailable then the\n" + + "scheduler will not wait hive.llap.task.scheduler.locality.delay before assigning the\n" + + "task to the next preferred LLAP node.\n" + + "This is effective only if hive.execution.mode is llap."), LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true, "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" + "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 46007559cd..0ebb67e424 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -946,6 +946,157 @@ public void testForcedLocalityUnknownHost() throws IOException, InterruptedExcep } } + @Test(timeout = 10000) + public void testMultipleLocalityRequest() throws Exception { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1, HOST2, HOST3}; + String[] hostsTarget = new String[]{HOST1, HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(200000L, hostsKnown, 1, 0, 10000L, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) + tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + try { + TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie1 = "cookie1"; + TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie2 = "cookie2"; + TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie3 = "cookie3"; + TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie4 = "cookie4"; + TezTaskAttemptID task5 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie5 = "cookie5"; + + tsWrapper.controlScheduler(true); + + // Get requested host + tsWrapper.allocateTask(task1, hostsTarget, priority1, clientCookie1); + // Get second requested host immediately because host1 is full + tsWrapper.allocateTask(task2, hostsTarget, priority1, clientCookie2); + + tsWrapper.awaitTotalTaskAllocations(2); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + // 1st task requested host1, got host1 + assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + // 2nd task requested host1, got host1 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Get random (only one remaining) host after locality delay because requested hosts are full + tsWrapper.allocateTask(task3, hostsTarget, priority1, clientCookie3); + + // Still only 2 allocations, but wait at least 1 scheduling run + tsWrapper.awaitTotalTaskAllocations(2); + + clock.setTime(clock.getTime() + 2000L); // Before the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + // Move the clock forward 10000ms, and check the delayed queue + clock.setTime(clock.getTime() + 10000L); // Past the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // 3rd task allocation should happen + tsWrapper.awaitTotalTaskAllocations(3); + + // 3rd task requested host1 or host2, but got host3 after locality timeout + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + // 1st task requested host1, got host1 + assertEquals(HOST3, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Finish 2nd and 3rd tasks + tsWrapper.deallocateTask(task2, true, null); + tsWrapper.deallocateTask(task3, true, null); + + // Reject from host1 and get second requested host immediately because host1 refuses + tsWrapper.rejectExecution(task1); + tsWrapper.allocateTask(task4, hostsTarget, priority1, clientCookie4); + + tsWrapper.awaitTotalTaskAllocations(4); + + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task4, argumentCaptor.getAllValues().get(0)); + // 4th task requested host1, got host1 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + reset(tsWrapper.mockAppCallback); + + // Reset stuff related to locality tests + clock.setTime(Time.monotonicNow()); + delayedTaskSchedulerCallableControlled.resetShouldScheduleInformation(); + + // Reject from host2 too and get random host after locality delay because both requested hosts refuses + tsWrapper.rejectExecution(task4); + tsWrapper.allocateTask(task5, hostsTarget, priority1, clientCookie5); + + // Still only 4 allocations, but wait at least 1 scheduling run + tsWrapper.awaitTotalTaskAllocations(4); + + clock.setTime(clock.getTime() + 2000L); // Before the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + // Move the clock forward 10000ms, and check the delayed queue + clock.setTime(clock.getTime() + 10000L); // Past the timeout. + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + tsWrapper.awaitTotalTaskAllocations(5); + + // 5th task requested host1 or host2, but got host3 after locality timeout + argumentCaptor = ArgumentCaptor.forClass(Object.class); + argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task5, argumentCaptor.getAllValues().get(0)); + // 1st task requested host1, got host1 + assertEquals(HOST3, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + } finally { + tsWrapper.shutdown(); + } + } + @Test(timeout = 10000) public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1); @@ -2255,7 +2406,7 @@ protected TezTaskAttemptID getTaskAttemptId(Object task) { @Override protected void schedulePendingTasks() throws InterruptedException { - LOG.info("Attempted schedulPendingTasks"); + LOG.info("Attempted schedulePendingTasks"); testLock.lock(); try { if (controlScheduling.get()) { @@ -2286,6 +2437,7 @@ void forTestSignalSchedulingRun() throws InterruptedException { testLock.lock(); try { schedulingTriggered = true; + schedulingComplete = false; triggerSchedulingCondition.signal(); } finally { testLock.unlock(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index 5224429e9e..271e5f2e67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -52,13 +52,22 @@ private final List locations; private final Set locationSet; + private final boolean useLocality; + private final int numberOfLocations; - public HostAffinitySplitLocationProvider(List knownLocations) { + public HostAffinitySplitLocationProvider(List knownLocations, boolean useLocality, + int numberOfLocations) { Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(), HostAffinitySplitLocationProvider.class.getName() + " needs at least 1 location to function"); + Preconditions.checkArgument(numberOfLocations > 0, + HostAffinitySplitLocationProvider.class.getName() + + " needs numberOfLocations greater than 0. It is set to [" + + numberOfLocations + "] now."); this.locations = knownLocations; this.locationSet = new HashSet(knownLocations); + this.useLocality = useLocality; + this.numberOfLocations = numberOfLocations; } @Override @@ -72,11 +81,9 @@ public HostAffinitySplitLocationProvider(List knownLocations) { FileSplit fsplit = (FileSplit) split; String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + fsplit.getLength(); - List preferredLocations = preferLocations(fsplit); - String location = - preferredLocations.get(determineLocation(preferredLocations, fsplit.getPath().toString(), - fsplit.getStart(), splitDesc)); - return (location != null) ? new String[] { location } : null; + List locationsToCheck = (useLocality ? preferLocations(fsplit) : locations); + return determineLocation(locationsToCheck, numberOfLocations, fsplit.getPath().toString(), + fsplit.getStart(), splitDesc); } private List preferLocations(FileSplit fsplit) throws IOException { @@ -99,35 +106,42 @@ public HostAffinitySplitLocationProvider(List knownLocations) { } @VisibleForTesting - public static int determineLocation( - List locations, String path, long start, String desc) { - if (locations.size() == 1) { + public static String[] determineLocation(List locations, int numberOfLocations, + String path, long start, String desc) { + if (locations.size() <= numberOfLocations) { // skip everything, this is simple - return 0; + return locations.toArray(new String[0]); } + List foundLocations = new ArrayList<>(numberOfLocations); byte[] bytes = getHashInputForSplit(path, start); long hash1 = hash1(bytes); int index = Hashing.consistentHash(hash1, locations.size()); String location = locations.get(index); + if (location != null) { + foundLocations.add(location); + } if (LOG.isDebugEnabled()) { LOG.debug(desc + " mapped to index=" + index + ", location=" + location); } int iter = 1; long hash2 = 0; // Since our probing method is totally bogus, give up after some time. - while (location == null && iter < locations.size() * 2) { + while (foundLocations.size() < numberOfLocations && iter < locations.size() * 2) { if (iter == 1) { hash2 = hash2(bytes); } // Note that this is not real double hashing since we have consistent hash on top. index = Hashing.consistentHash(hash1 + iter * hash2, locations.size()); location = locations.get(index); + if (location != null && !foundLocations.contains(location)) { + foundLocations.add(location); + } if (LOG.isDebugEnabled()) { LOG.debug(desc + " remapped to index=" + index + ", location=" + location); } ++iter; } - return index; + return foundLocations.toArray(new String[0]); } private static byte[] getHashInputForSplit(String path, long start) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 1b7321bb63..af2ed05f85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -49,6 +49,10 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, SplitLocationProvider splitLocationProvider; LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { + int numberOfLocations = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS_NUMBER); + boolean useLocality = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS_PREFER_LOCALITY); LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf); LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); @@ -64,7 +68,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, } locations.add(serviceInstance.getHost()); } - splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + splitLocationProvider = new HostAffinitySplitLocationProvider(locations, useLocality, numberOfLocations); } else { splitLocationProvider = new SplitLocationProvider() { @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index f37a10cf2d..b49fd8bae8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -63,7 +63,7 @@ @Test (timeout = 5000) public void testNonFileSplits() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, true,1); InputSplit inputSplit1 = createMockInputSplit(new String[] {locations.get(0), locations.get(1)}); InputSplit inputSplit2 = createMockInputSplit(new String[] {locations.get(2), locations.get(3)}); @@ -74,7 +74,7 @@ public void testNonFileSplits() throws IOException { @Test (timeout = 5000) public void testOrcSplitsBasic() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, true, 1); InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations.get(0), locations.get(1)}); InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations.get(2), locations.get(3)}); @@ -110,7 +110,7 @@ public void testConsistentHashing() throws IOException { movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE; for (int locs = MIN_LOC_COUNT; locs <= locations.size(); ++locs) { List partLoc = locations.subList(0, locs); - HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc); + HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc, true, 1); int moved = 0, newLoc = 0; String newNode = partLoc.get(locs - 1); for (int splitIx = 0; splitIx < splits.length; ++splitIx) { @@ -209,14 +209,14 @@ private double testHashDistribution(int locs, final int missCount, FileSplit[] s Mockito.when(partLocs.get(Mockito.anyInt())).thenAnswer(new Answer() { @Override public String answer(InvocationOnMock invocation) throws Throwable { - return (state.getAndIncrement() == missCount) ? "not-null" : null; + return (state.getAndIncrement() == missCount) ? invocation.getArguments()[0].toString() : null; } }); int[] hitCounts = new int[locs]; for (int splitIx = 0; splitIx < splits.length; ++splitIx) { state.set(0); - int index = HostAffinitySplitLocationProvider.determineLocation(partLocs, - splits[splitIx].getPath().toString(), splits[splitIx].getStart(), null); + int index = Integer.valueOf(HostAffinitySplitLocationProvider.determineLocation(partLocs, 1, + splits[splitIx].getPath().toString(), splits[splitIx].getStart(), null)[0]); ++hitCounts[index]; } SummaryStatistics ss = new SummaryStatistics(); @@ -257,7 +257,7 @@ private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, Stri @Test (timeout = 5000) public void testOrcSplitsLocationAffinity() throws IOException { - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations, true, 1); // Same file, offset, different lengths InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)}); @@ -302,7 +302,7 @@ public void testOrcSplitsLocationAffinity() throws IOException { @Test (timeout = 90000000) public void testDFSLocalityAwareAffinity() throws IOException { List someLocations = locations.subList(0, 2); // 0,1 locations - HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(someLocations); + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(someLocations, true, 1); // Different base localities InputSplit os1 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)}); // 0 or 1 @@ -340,9 +340,109 @@ public void testDFSLocalityAwareAffinity() throws IOException { assertArrayEquals(retLoc4, againLoc4); } + @Test (timeout = 10000) + public void testMultipleHostAffinity() throws IOException { + List someLocations = locations.subList(0, 3); // 0,1,2 locations + HostAffinitySplitLocationProvider locationProvider = + new HostAffinitySplitLocationProvider(someLocations, true, 2); + + // Different base localities + InputSplit os1 = createMockFileSplit(true, "path1", 10, 20, + new String[] {locations.get(0), locations.get(1), locations.get(2)}); + InputSplit os2 = createMockFileSplit(true, "path1", 10, 20, + new String[] {locations.get(0), locations.get(1), locations.get(3)}); + InputSplit os3 = createMockFileSplit(true, "path1", 20, 30, + new String[] {locations.get(0), locations.get(1)}); + InputSplit os4 = createMockFileSplit(true, "path1", 30, 40, + new String[] {locations.get(0), locations.get(2)}); + InputSplit os5 = createMockFileSplit(true, "path1", 40, 50, + new String[] {locations.get(1), locations.get(3)}); + InputSplit os6 = createMockFileSplit(true, "path1", 50, 60, + new String[] {locations.get(3)}); + + String[] retLoc1 = locationProvider.getLocations(os1); + String[] retLoc2 = locationProvider.getLocations(os2); + String[] retLoc3 = locationProvider.getLocations(os3); + String[] retLoc4 = locationProvider.getLocations(os4); + String[] retLoc5 = locationProvider.getLocations(os5); + String[] retLoc6 = locationProvider.getLocations(os6); + + assertEquals(2, retLoc1.length); + assertTrue(someLocations.contains(retLoc1[0])); + assertTrue(someLocations.contains(retLoc1[1])); + + assertEquals(2, retLoc2.length); + assertTrue(someLocations.contains(retLoc2[0])); + assertTrue(someLocations.contains(retLoc2[1])); + + assertEquals(2, retLoc3.length); + assertTrue(someLocations.contains(retLoc3[0])); + assertTrue(someLocations.contains(retLoc3[1])); + + assertEquals(2, retLoc4.length); + assertTrue(someLocations.contains(retLoc4[0])); + assertTrue(someLocations.contains(retLoc4[1])); + + assertEquals(1, retLoc5.length); + assertTrue(someLocations.contains(retLoc5[0])); + assertEquals(someLocations.get(1), retLoc5[0]); // is always 1 + + // When there is no matching location then we choose 2 from all of the nodes + assertEquals(2, retLoc6.length); + + String[] againLoc1 = locationProvider.getLocations(os1); + String[] againLoc2 = locationProvider.getLocations(os2); + String[] againLoc3 = locationProvider.getLocations(os3); + String[] againLoc4 = locationProvider.getLocations(os4); + String[] againLoc5 = locationProvider.getLocations(os5); + String[] againLoc6 = locationProvider.getLocations(os6); + + assertArrayEquals(retLoc1, againLoc1); + assertArrayEquals(retLoc2, againLoc2); + assertArrayEquals(retLoc3, againLoc3); + assertArrayEquals(retLoc4, againLoc4); + assertArrayEquals(retLoc5, againLoc5); + assertArrayEquals(retLoc6, againLoc6); + } + + @Test (timeout = 10000) + public void testLocalityFalse() throws IOException { + List someLocations = locations.subList(0, 2); // 0,1 locations + HostAffinitySplitLocationProvider locationProvider = + new HostAffinitySplitLocationProvider(someLocations, false, 1); + + // Different base localities + InputSplit os1 = createMockFileSplit(true, "path1", 10, 20, new String[]{locations.get(0)}); + InputSplit os2 = createMockFileSplit(true, "path1", 10, 20, new String[]{locations.get(1)}); + InputSplit os3 = createMockFileSplit(true, "path1", 20, 30, new String[]{locations.get(2)}); + + String[] retLoc1 = locationProvider.getLocations(os1); + String[] retLoc2 = locationProvider.getLocations(os2); + String[] retLoc3 = locationProvider.getLocations(os3); + + assertEquals(1, retLoc1.length); + assertTrue(someLocations.contains(retLoc1[0])); + + assertEquals(1, retLoc2.length); + assertTrue(someLocations.contains(retLoc2[0])); + + assertEquals(1, retLoc3.length); + assertTrue(someLocations.contains(retLoc3[0])); + + // All of the locations should be the same + assertTrue(retLoc1[0].equals(retLoc2[0])); + assertTrue(retLoc1[0].equals(retLoc3[0])); + + String[] againLoc1 = locationProvider.getLocations(os1); + String[] againLoc2 = locationProvider.getLocations(os2); + String[] againLoc3 = locationProvider.getLocations(os3); + assertArrayEquals(retLoc1, againLoc1); + assertArrayEquals(retLoc2, againLoc2); + assertArrayEquals(retLoc3, againLoc3); + } - private InputSplit createMockInputSplit(String[] locations) throws IOException { + private InputSplit createMockInputSplit(String[] locations) throws IOException { InputSplit inputSplit = mock(InputSplit.class); doReturn(locations).when(inputSplit).getLocations(); return inputSplit;