diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 6178b4bbc1..4f9e0da530 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceSet; @@ -46,6 +47,11 @@ */ void unregister() throws IOException; + /** + * Update the current registration with the given attributes. + */ + void updateRegistration(Iterable> attributes) throws IOException; + /** * Client API to get the list of instances registered via the current registry key. * @param component diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index f99d86c2e8..deedc9b290 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -112,6 +112,11 @@ public void unregister() throws IOException { // nothing to unregister } + @Override + public void updateRegistration(Iterable attributes) throws IOException { + throw new UnsupportedOperationException(); + } + public static String getWorkerIdentity(String host) { // trigger clean errors for anyone who mixes up identity with hosts return "host-" + host; diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 3bda40b7e9..964faf006b 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; -import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -34,6 +33,11 @@ public class LlapRegistryService extends AbstractService { + public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE = + "hive.llap.daemon.task.scheduler.enabled.wait.queue.size"; + public static final String LLAP_DAEMON_NUM_ENABLED_EXECUTORS = + "hive.llap.daemon.num.enabled.executors"; + private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); private ServiceRegistry registry = null; @@ -132,6 +136,12 @@ private void unregisterWorker() throws IOException { } } + public void updateRegistration(Iterable> attributes) throws IOException { + if (isDaemon && this.registry != null) { + this.registry.updateRegistration(attributes); + } + } + public LlapServiceInstanceSet getInstances() throws IOException { return getInstances(0); } @@ -158,4 +168,5 @@ public String getWorkerIdentity() { public ApplicationId getApplicationId() throws IOException { return registry.getApplicationId(); } + } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index f5d6202e6f..2472a375cb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -70,6 +70,7 @@ private SlotZnode slotZnode; + private ServiceRecord srv; // to be used by clients of ServiceRegistry TODO: this is unnecessary private DynamicServiceInstanceSet instances; @@ -124,7 +125,7 @@ public Endpoint getOutputFormatEndpoint() { @Override public String register() throws IOException { - ServiceRecord srv = new ServiceRecord(); + srv = new ServiceRecord(); Endpoint rpcEndpoint = getRpcEndpoint(); srv.addInternalEndpoint(rpcEndpoint); srv.addInternalEndpoint(getMngEndpoint()); @@ -132,13 +133,13 @@ public String register() throws IOException { srv.addExternalEndpoint(getServicesEndpoint()); srv.addInternalEndpoint(getOutputFormatEndpoint()); - for (Map.Entry kv : this.conf) { - if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP) - || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) { - // TODO: read this somewhere useful, like the task scheduler - srv.set(kv.getKey(), kv.getValue()); - } - } + populateConfigValues(this.conf); + Map capacityValues = new HashMap<>(2); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, + HiveConf.getVarWithoutType(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE, + HiveConf.getVarWithoutType(conf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE)); + populateConfigValues(capacityValues.entrySet()); String uniqueId = registerServiceRecord(srv); long znodeCreationTimeout = 120; @@ -164,6 +165,22 @@ public String register() throws IOException { return uniqueId; } + private void populateConfigValues(Iterable> attributes) { + for (Map.Entry kv : attributes) { + if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP) + || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + } + + @Override + public void updateRegistration(Iterable> attributes) throws IOException { + populateConfigValues(attributes); + updateServiceRecord(this.srv, doCheckAcls, true); + } + @Override public void unregister() throws IOException { // Nothing for the zkCreate models @@ -197,7 +214,7 @@ public DynamicServiceInstance(ServiceRecord srv) throws IOException { this.serviceAddress = RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI); String memStr = srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, ""); - String coreStr = srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, ""); + String coreStr = srv.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, ""); try { this.resource = Resource.newInstance(Integer.parseInt(memStr), Integer.parseInt(coreStr)); } catch (NumberFormatException ex) { diff --git llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java new file mode 100644 index 0000000000..9a014a13ac --- /dev/null +++ llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java @@ -0,0 +1,124 @@ +package org.apache.hadoop.hive.llap.registry.impl; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import org.mockito.internal.util.reflection.Fields; +import org.mockito.internal.util.reflection.InstanceField; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.lang.Integer.parseInt; +import static org.junit.Assert.assertEquals; + +public class TestLlapZookeeperRegistryImpl { + + @Spy + private HiveConf mockConf = new HiveConf(); + + private LlapZookeeperRegistryImpl registry; + + private CuratorFramework curatorFramework; + private TestingServer server; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + registry = new LlapZookeeperRegistryImpl("TestLlapZookeeperRegistryImpl", mockConf); + + server = new TestingServer(); + server.start(); + + curatorFramework = CuratorFrameworkFactory. + builder(). + connectString(server.getConnectString()). + sessionTimeoutMs(1000). + retryPolicy(new RetryOneTime(1000)). + build(); + curatorFramework.start(); + + trySetMock(registry, CuratorFramework.class, curatorFramework); + + } + + @After + public void tearDown() throws IOException { + curatorFramework.close(); + server.stop(); + } + + @Test + public void testRegister() throws Exception { + // Given + int expectedExecutorCount = HiveConf.getIntVar(mockConf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + int expectedQueueSize = HiveConf.getIntVar(mockConf, HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); + + // When + registry.register(); + ServiceInstanceSet serviceInstanceSet = + registry.getInstances("LLAP", 1000); + + // Then + Collection llaps = serviceInstanceSet.getAll(); + assertEquals(1, llaps.size()); + LlapServiceInstance serviceInstance = llaps.iterator().next(); + Map attributes = serviceInstance.getProperties(); + + assertEquals(expectedQueueSize, + parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE))); + assertEquals(expectedExecutorCount, + parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS))); + } + + @Test + public void testUpdate() throws Exception { + // Given + String expectedExecutorCount = "2"; + String expectedQueueSize = "20"; + Map capacityValues = new HashMap<>(2); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, expectedExecutorCount); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE, expectedQueueSize); + + // When + registry.register(); + registry.updateRegistration(capacityValues.entrySet()); + ServiceInstanceSet serviceInstanceSet = + registry.getInstances("LLAP", 1000); + + // Then + Collection llaps = serviceInstanceSet.getAll(); + assertEquals(1, llaps.size()); + LlapServiceInstance serviceInstance = llaps.iterator().next(); + Map attributes = serviceInstance.getProperties(); + + assertEquals(expectedQueueSize, + attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE)); + assertEquals(expectedExecutorCount, + attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS)); + } + + static void trySetMock(Object o, Class clazz, T mock) { + List instanceFields = Fields + .allDeclaredFieldsOf(o) + .filter(instanceField -> !clazz.isAssignableFrom(instanceField.jdkField().getType())) + .instanceFields(); + if (instanceFields.size() != 1) { + throw new RuntimeException("Mocking is only supported, if only one field is assignable from the given class."); + } + InstanceField instanceField = instanceFields.get(0); + instanceField.set(mock); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index 46827bcde1..bb79720681 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -51,5 +51,5 @@ UpdateFragmentResponseProto updateFragment( UpdateFragmentRequestProto request) throws IOException; SetCapacityResponseProto setCapacity( - SetCapacityRequestProto request); + SetCapacityRequestProto request) throws IOException; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index cbc5336e5d..1b9836b00e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -24,7 +24,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -641,7 +643,14 @@ public UpdateFragmentResponseProto updateFragment( @Override public SetCapacityResponseProto setCapacity( - SetCapacityRequestProto request) { + SetCapacityRequestProto request) throws IOException { + + Map capacityValues = new HashMap<>(2); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, + Integer.toString(request.getExecutorNum())); + capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE, + Integer.toString(request.getQueueSize())); + registry.updateRegistration(capacityValues.entrySet()); return containerRunner.setCapacity(request); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index bb03727e1d..e7e90d39db 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -331,10 +331,11 @@ public GetTokenResponseProto getDelegationToken(RpcController controller, @Override public LlapDaemonProtocolProtos.SetCapacityResponseProto setCapacity(final RpcController controller, final LlapDaemonProtocolProtos.SetCapacityRequestProto request) throws ServiceException { - LlapDaemonProtocolProtos.SetCapacityResponseProto.Builder responseProtoBuilder = - LlapDaemonProtocolProtos.SetCapacityResponseProto.newBuilder(); - containerRunner.setCapacity(request); - return responseProtoBuilder.build(); + try { + return containerRunner.setCapacity(request); + } catch (IOException e) { + throw new ServiceException(e); + } } private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java new file mode 100644 index 0000000000..cc8d0b8f30 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.llap.daemon.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.internal.util.reflection.Fields; +import org.mockito.internal.util.reflection.InstanceField; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestLlapDaemon { + + private static final String[] METRICS_SOURCES = new String[]{ + "JvmMetrics", + "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(), + "LlapDaemonJvmMetrics-" + MetricsUtils.getHostName(), + MetricsUtils.METRICS_PROCESS_NAME + }; + + @Spy + private Configuration mockConf = new HiveConf(); + + @Mock + private LlapRegistryService mockRegistry; + + @Captor + private ArgumentCaptor>> captor; + + private LlapDaemon daemon; + + @Before + public void setUp() { + initMocks(this); + when(mockConf.getTrimmed(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, null)).thenReturn("@llap"); + when(mockConf.getTrimmed(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "")).thenReturn("localhost"); + + String[] localDirs = new String[1]; + LlapDaemonInfo.initialize("testDaemon", mockConf); + daemon = new LlapDaemon(mockConf, 1, LlapDaemon.getTotalHeapSize(), false, false, + -1, localDirs, 0, 0, 0, -1, "TestLlapDaemon"); + } + + @After + public void tearDown() { + MetricsSystem ms = LlapMetricsSystem.instance(); + for (String mSource : METRICS_SOURCES) { + ms.unregisterSource(mSource); + } + daemon.shutdown(); + } + + @Test + public void testUpdateRegistration() throws IOException { + // Given + int enabledExecutors = 0; + int enabledQueue = 2; + trySetMock(daemon, LlapRegistryService.class, mockRegistry); + + // When + daemon.setCapacity(LlapDaemonProtocolProtos.SetCapacityRequestProto.newBuilder() + .setQueueSize(enabledQueue) + .setExecutorNum(enabledExecutors) + .build()); + verify(mockRegistry).updateRegistration(captor.capture()); + + // Then + Map attributes = StreamSupport.stream(captor.getValue().spliterator(), false) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + + Assert.assertTrue(attributes.containsKey(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS)); + Assert.assertTrue(attributes.containsKey(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE)); + Assert.assertEquals(enabledQueue, + Integer.parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE))); + Assert.assertEquals(enabledExecutors, + Integer.parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS))); + + } + + static void trySetMock(Object o, Class clazz, T mock) { + List instanceFields = Fields + .allDeclaredFieldsOf(o) + .filter(instanceField -> !clazz.isAssignableFrom(instanceField.jdkField().getType())) + .instanceFields(); + if (instanceFields.size() != 1) { + throw new RuntimeException("Mocking is only supported, if only one field is assignable from the given class."); + } + InstanceField instanceField = instanceFields.get(0); + instanceField.set(mock); + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 37e2fcd8da..fd1b8b74ce 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -822,6 +822,8 @@ public void run() { amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), serializedToken, jobIdForToken, 0); } + + } finally { writeLock.unlock(); } @@ -846,8 +848,10 @@ public void onCreate(LlapServiceInstance serviceInstance, int ephSeqVersion) { @Override public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) { - // Registry uses ephemeral sequential znodes that are never updated as of now. - LOG.warn("Unexpected update for instance={}. Ignoring", serviceInstance); + NodeInfo nodeInfo = instanceToNodeMap.get(serviceInstance.getWorkerIdentity()); + nodeInfo.updateLlapServiceInstance(serviceInstance, numSchedulableTasksPerNode); + LOG.info("Updated node with identity: {} as a result of registry callback", + serviceInstance.getWorkerIdentity()); } @Override @@ -2482,7 +2486,7 @@ public void shutdown() { @VisibleForTesting static class NodeInfo implements Delayed { private final NodeBlacklistConf blacklistConf; - final LlapServiceInstance serviceInstance; + LlapServiceInstance serviceInstance; private final Clock clock; long expireTimeMillis = -1; @@ -2500,11 +2504,11 @@ public void shutdown() { private int numPreemptedTasks = 0; private int numScheduledTasks = 0; - private final int numSchedulableTasks; + private int numSchedulableTasks; private final LlapTaskSchedulerMetrics metrics; - private final Resource resourcePerExecutor; + private Resource resourcePerExecutor; - private final String shortStringBase; + private String shortStringBase; /** * Create a NodeInfo bound to a service instance @@ -2518,36 +2522,11 @@ public void shutdown() { NodeInfo(LlapServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) { Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); - this.serviceInstance = serviceInstance; this.blacklistConf = blacklistConf; this.clock = clock; this.metrics = metrics; - int numVcores = serviceInstance.getResource().getVirtualCores(); - int memoryPerInstance = serviceInstance.getResource().getMemory(); - int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores); - resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1); - - if (numSchedulableTasksConf == 0) { - int pendingQueueuCapacity = 0; - String pendingQueueCapacityString = serviceInstance.getProperties() - .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); - LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}", - serviceInstance, serviceInstance.getResource().getVirtualCores(), - pendingQueueCapacityString, serviceInstance.getResource().getMemory()); - if (pendingQueueCapacityString != null) { - pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); - } - this.numSchedulableTasks = numVcores + pendingQueueuCapacity; - } else { - this.numSchedulableTasks = numSchedulableTasksConf; - LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); - } - if (metrics != null) { - metrics.incrSchedulableTasksCount(numSchedulableTasks); - } - shortStringBase = setupShortStringBase(); - + updateLlapServiceInstance(serviceInstance, numSchedulableTasksConf); } String getNodeIdentity() { @@ -2570,6 +2549,40 @@ public Resource getResourcePerExecutor() { return resourcePerExecutor; } + void updateLlapServiceInstance(LlapServiceInstance serviceInstance, int numSchedulableTasksConf) { + this.serviceInstance = serviceInstance; + + int numVcores = serviceInstance.getResource().getVirtualCores(); + int memoryPerInstance = serviceInstance.getResource().getMemory(); + int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores); + resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1); + + int oldNumSchedulableTasks = numSchedulableTasks; + if (numSchedulableTasksConf == 0) { + int pendingQueueuCapacity = 0; + String pendingQueueCapacityString = serviceInstance.getProperties() + .get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE); + if (pendingQueueCapacityString == null) { + pendingQueueCapacityString = serviceInstance.getProperties() + .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); + } + LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}", + serviceInstance, serviceInstance.getResource().getVirtualCores(), + pendingQueueCapacityString, serviceInstance.getResource().getMemory()); + if (pendingQueueCapacityString != null) { + pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); + } + this.numSchedulableTasks = numVcores + pendingQueueuCapacity; + } else { + this.numSchedulableTasks = numSchedulableTasksConf; + LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); + } + if (metrics != null) { + metrics.incrSchedulableTasksCount(numSchedulableTasks - oldNumSchedulableTasks); + } + shortStringBase = setupShortStringBase(); + } + void resetExpireInformation() { expireTimeMillis = -1; hadCommFailure = false; @@ -2660,7 +2673,6 @@ boolean _canAccepInternal() { &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); } - int canAcceptCounter = 0; /* Returning true does not guarantee that the task will run, considering other queries may be running in the system. Also depends upon the capacity usage configuration */ @@ -2669,11 +2681,6 @@ boolean canAcceptTask() { if (LOG.isTraceEnabled()) { LOG.trace(constructCanAcceptLogResult(result)); } - if (canAcceptCounter == 10000) { - canAcceptCounter++; - LOG.info(constructCanAcceptLogResult(result)); - canAcceptCounter = 0; - } return result; } diff --git service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index f4b436217f..ea5965d9ea 100644 --- service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -140,6 +140,11 @@ public void unregister() { unregisterInternal(); } + @Override + public void updateRegistration(Iterable> attributes) throws IOException { + throw new UnsupportedOperationException(); + } + private void populateCache() throws IOException { PathChildrenCache pcc = ensureInstancesCache(0); populateCache(pcc, false);