diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java index 1d6b716b3c..d9c2364ee2 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry.impl; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; @@ -62,7 +63,7 @@ public int getOutputFormatPort() { @Override public Map getProperties() { - throw new UnsupportedOperationException(); + return Collections.emptyMap(); } @Override diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 344eba781e..7ec4854cac 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -122,7 +124,8 @@ public static String getWorkerIdentity(String host) { return "host-" + host; } - private final class FixedServiceInstance implements LlapServiceInstance { + @VisibleForTesting + public final class FixedServiceInstance implements LlapServiceInstance { private final String host; private final String serviceAddress; diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 58a99f4cf7..0a12923865 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.llap.registry.impl; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.registry.client.binding.RegistryUtils; import com.google.common.collect.Sets; @@ -186,7 +187,8 @@ public void unregister() throws IOException { // Nothing for the zkCreate models } - private class DynamicServiceInstance + @VisibleForTesting + public class DynamicServiceInstance extends ServiceInstanceBase implements LlapServiceInstance { private final int mngPort; private final int shufflePort; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index c5d96e54eb..7e2c5d2ad1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -46,7 +46,8 @@ HostAffinitySplitLocationProvider.class); private final boolean isDebugEnabled = LOG.isDebugEnabled(); - private final List locations; + @VisibleForTesting + final List locations; public HostAffinitySplitLocationProvider(List knownLocations) { Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 1b7321bb63..faffcf43dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -50,21 +51,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf); - LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); - - Collection serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); - Preconditions.checkArgument(!serviceInstances.isEmpty(), - "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); - ArrayList locations = new ArrayList<>(serviceInstances.size()); - for (LlapServiceInstance serviceInstance : serviceInstances) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + - serviceInstance.getHost() + " to list for split locations"); - } - locations.add(serviceInstance.getHost()); - } - splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + return getCustomSplitLocationProvider(serviceRegistry, LOG); } else { splitLocationProvider = new SplitLocationProvider() { @Override @@ -84,4 +71,31 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, } return splitLocationProvider; } + + @VisibleForTesting + static SplitLocationProvider getCustomSplitLocationProvider(LlapRegistryService serviceRegistry, Logger LOG) throws + IOException { + LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); + + Collection serviceInstances = + serviceRegistry.getInstances().getAllInstancesOrdered(true); + Preconditions.checkArgument(!serviceInstances.isEmpty(), + "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); + ArrayList locations = new ArrayList<>(serviceInstances.size()); + for (LlapServiceInstance serviceInstance : serviceInstances) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " to list for split locations"); + } + String executors = + serviceInstance.getProperties().get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS); + if (executors != null && Integer.parseInt(executors) == 0) { + // If the executors set to 0 we should not consider this location for affinity + locations.add(null); + } else { + locations.add(serviceInstance.getHost()); + } + } + return new HostAffinitySplitLocationProvider(locations); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index 13f4676bdd..61c98b7c7d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -306,8 +306,8 @@ private InputSplit createMockInputSplit(String[] locations) throws IOException { return inputSplit; } - private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, - long length, String[] locations) throws IOException { + static FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, + long length, String[] locations) throws IOException { FileSplit fileSplit; if (createOrcSplit) { fileSplit = mock(OrcSplit.class); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java new file mode 100644 index 0000000000..50b7c6f2f1 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestUtils { + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class); + + private static final String INACTIVE = "inactive"; + private static final String ACTIVE = "dynamic"; + private static final String DISABLED = "disabled"; + private static final String FIXED = "fix"; + + + @Mock + private LlapRegistryService mockRegistry; + + @Mock + private LlapServiceInstanceSet mockInstanceSet; + + @Before + public void setUp() { + initMocks(this); + } + + @Test + public void testGetSplitLocationProvider() throws IOException, URISyntaxException { + // Create test LlapServiceInstances to make sure that we can handle all of the instance types + List instances = new ArrayList<>(3); + + // Set 1 inactive instance to make sure that this does not cause problem for us + LlapServiceInstance inactive = new InactiveServiceInstance(INACTIVE); + instances.add(inactive); + + LlapZookeeperRegistryImpl dynRegistry = new LlapZookeeperRegistryImpl("dyn", new HiveConf()); + Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(ACTIVE, 4000)); + Endpoint shuffle = RegistryTypeUtils.ipcEndpoint("shuffle", new InetSocketAddress(ACTIVE, 4000)); + Endpoint mng = RegistryTypeUtils.ipcEndpoint("llapmng", new InetSocketAddress(ACTIVE, 4000)); + Endpoint outputFormat = RegistryTypeUtils.ipcEndpoint("llapoutputformat", new InetSocketAddress(ACTIVE, 4000)); + Endpoint services = RegistryTypeUtils.webEndpoint("services", new URI(ACTIVE + ":4000")); + + // Set 1 active instance + ServiceRecord enabledSrv = new ServiceRecord(); + enabledSrv.addInternalEndpoint(rpcEndpoint); + enabledSrv.addInternalEndpoint(shuffle); + enabledSrv.addInternalEndpoint(mng); + enabledSrv.addInternalEndpoint(outputFormat); + enabledSrv.addExternalEndpoint(services); + + enabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 10); + enabledSrv.set(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, 100); + LlapZookeeperRegistryImpl.DynamicServiceInstance dynamic = + dynRegistry.new DynamicServiceInstance(enabledSrv); + instances.add(dynamic); + + // Set 1 instance with 0 executors + ServiceRecord disabledSrv = new ServiceRecord(enabledSrv); + disabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 0); + LlapZookeeperRegistryImpl.DynamicServiceInstance disabled = + dynRegistry.new DynamicServiceInstance(disabledSrv); + disabled.setHost(DISABLED); + instances.add(disabled); + + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + when(mockInstanceSet.getAllInstancesOrdered(anyBoolean())).thenReturn(instances); + SplitLocationProvider provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG); + + assertLocations((HostAffinitySplitLocationProvider)provider, new String[] { ACTIVE }); + + // Check if fixed stuff is working as well + LlapFixedRegistryImpl fixRegistry = new LlapFixedRegistryImpl("llap", new HiveConf()); + + // Instance for testing fixed registry instances + LlapServiceInstance fixed = fixRegistry.new FixedServiceInstance(FIXED); + instances.remove(dynamic); + instances.add(fixed); + + provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG); + + assertLocations((HostAffinitySplitLocationProvider)provider, new String[] { FIXED }); + } + + private void assertLocations(HostAffinitySplitLocationProvider provider, String[] expectedLocations) + throws IOException { + InputSplit inputSplit1 = + TestHostAffinitySplitLocationProvider.createMockFileSplit( + true, "path2", 0, 1000, new String[] {"HOST-1", "HOST-2"}); + + // Check that the provider does not return disabled/inactive instances and returns onl 1 location + List result = new ArrayList<>(Arrays.asList(provider.getLocations(inputSplit1))); + assertEquals(1, result.size()); + assertFalse(result.contains(INACTIVE)); + assertFalse(result.contains(DISABLED)); + + // Since we can not check the results for every input, dig into the provider internal data to + // make sure that we have only the available host name in the location list + // Remove nulls + Set knownLocations = new HashSet<>(); + knownLocations.addAll(provider.locations); + knownLocations.remove(null); + assertArrayEquals(expectedLocations, knownLocations.toArray(new String[] {})); + } +}