diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index f3c0075671..586e726d0f 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -1,16 +1,21 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.hive.llap.registry.impl; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; @@ -133,13 +138,20 @@ private void unregisterWorker() throws IOException { } } - public ConfigChangeLockResult lockForConfigChange(long nextMinConfigChangeTime) { + /** + * Locks the Llap Cluster for configuration change for the given time window. + * @param windowStart The beginning of the time window when no other configuration change is allowed. + * @param windowEnd The end of the time window when no other configuration change is allowed. + * @return The result of the change (success if the lock is succeeded, and the next possible + * configuration change time + */ + public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) { if (this.registry == null) { throw new IllegalStateException("Not allowed to call lockForConfigChange before serviceInit"); } if (isDynamic) { LlapZookeeperRegistryImpl zkRegisty = (LlapZookeeperRegistryImpl)registry; - return zkRegisty.lockForConfigChange(nextMinConfigChangeTime); + return zkRegisty.lockForConfigChange(windowStart, windowEnd); } else { throw new UnsupportedOperationException("Acquiring config lock is only allowed for dynamic registries"); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index bf3fcc3ee0..ccfc3385e3 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -1,16 +1,21 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.hive.llap.registry.impl; import com.google.common.annotations.VisibleForTesting; @@ -71,7 +76,7 @@ private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; private static final String CONFIG_CHANGE_PATH = "config-change"; - private static final String CONFIG_CHANGE_NODE = "next-change"; + private static final String CONFIG_CHANGE_NODE = "window-end"; private SlotZnode slotZnode; @@ -79,7 +84,7 @@ // to be used by clients of ServiceRegistry TODO: this is unnecessary private DynamicServiceInstanceSet instances; - private DistributedAtomicLong nextChangeTime; + private DistributedAtomicLong lockWindowEnd; public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, @@ -425,33 +430,38 @@ protected String getZkPathUser(Configuration conf) { } /** - * Locks the Llap Cluster for configuration change by setting the next possible configuration - * change time. Until this time is reached the configuration should not be changed. - * @param nextMinConfigChangeTime The next time when the cluster can be reconfigured + * Locks the Llap Cluster for configuration change for the given time window. + * @param windowStart The beginning of the time window when no other configuration change is allowed. + * @param windowEnd The end of the time window when no other configuration change is allowed. * @return The result of the change (success if the lock is succeeded, and the next possible * configuration change time */ - public ConfigChangeLockResult lockForConfigChange(long nextMinConfigChangeTime) { + public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) { + if (windowEnd < windowStart) { + throw new IllegalArgumentException( + "WindowStart=" + windowStart + " can not be smaller than WindowEnd=" + windowEnd); + } try { - if (nextChangeTime == null) { + if (lockWindowEnd == null) { // Create the node with the /llap-sasl/hiveuser/hostname/config-change/next-change path without retry - nextChangeTime = new DistributedAtomicLong(zooKeeperClient, + lockWindowEnd = new DistributedAtomicLong(zooKeeperClient, String.join("/", workersPath.substring(0, workersPath.lastIndexOf('/')), CONFIG_CHANGE_PATH, CONFIG_CHANGE_NODE), (i, j, sleeper) -> false); - nextChangeTime.initialize(0L); + lockWindowEnd.initialize(0L); } - AtomicValue current = nextChangeTime.get(); + AtomicValue current = lockWindowEnd.get(); if (!current.succeeded()) { LOG.debug("Can not get the current configuration lock time"); return new ConfigChangeLockResult(false, -1L); } - if (current.postValue() >= nextMinConfigChangeTime) { - LOG.debug("Can not set {}. Current value is {}.", nextMinConfigChangeTime, current.postValue()); + if (current.postValue() > windowStart) { + LOG.debug("Can not lock window {}-{}. Current value is {}.", windowStart, windowEnd, current.postValue()); return new ConfigChangeLockResult(false, current.postValue()); } - current = nextChangeTime.compareAndSet(current.postValue(), nextMinConfigChangeTime); + current = lockWindowEnd.compareAndSet(current.postValue(), windowEnd); if (!current.succeeded()) { - LOG.debug("Can not set {}. Current value is changed to {}.", nextMinConfigChangeTime, current.postValue()); + LOG.debug("Can not lock window {}-{}. Current value is changed to {}.", windowStart, windowEnd, + current.postValue()); return new ConfigChangeLockResult(false, current.postValue()); } return new ConfigChangeLockResult(true, current.postValue()); @@ -462,8 +472,8 @@ public ConfigChangeLockResult lockForConfigChange(long nextMinConfigChangeTime) } public static class ConfigChangeLockResult { - boolean success; - long nextConfigChangeTime; + private final boolean success; + private final long nextConfigChangeTime; @VisibleForTesting public ConfigChangeLockResult(boolean success, long nextConfigChangeTime) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java index 31688dca86..ce5c92071e 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java @@ -50,29 +50,64 @@ public static void tearDown() throws IOException { } @Test - public void testLockForConfigChange() { - LlapRegistryService client1 = new LlapRegistryService(false); - client1.init(conf); - client1.start(); - - LlapRegistryService client2 = new LlapRegistryService(false); - client2.init(conf); - client2.start(); - ConfigChangeLockResult result = null; - - assertTrue(client1.lockForConfigChange(10000).success); - assertTrue(client2.lockForConfigChange(30000).success); - // Can not set to before - result = client1.lockForConfigChange(20000); - assertFalse(result.success); - assertEquals(result.nextConfigChangeTime, 30000); - // Can not set to the same timestamp - result = client1.lockForConfigChange(30000); - assertFalse(result.success); - assertEquals(result.nextConfigChangeTime, 30000); - // Check return value in case of success - result = client1.lockForConfigChange(40000); - assertTrue(result.success); - assertEquals(result.nextConfigChangeTime, 40000); + public void testLockForConfigChange() throws IOException { + LlapRegistryService client1 = null; + LlapRegistryService client2 = null; + ConfigChangeLockResult result; + + try { + client1 = new LlapRegistryService(false); + client1.init(conf); + client1.start(); + + client2 = new LlapRegistryService(false); + client2.init(conf); + client2.start(); + + assertTrue(client1.lockForConfigChange(10000, 20000).isSuccess()); + assertTrue(client2.lockForConfigChange(30000, 40000).isSuccess()); + + // Can not set to before + result = client1.lockForConfigChange(20000, 30000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + result = client1.lockForConfigChange(30000, 40000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + result = client1.lockForConfigChange(35000, 45000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + // Can start from the previous end timestamp + result = client1.lockForConfigChange(40000, 50000); + assertTrue(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 50000); + } finally { + if (client1 != null) { + client1.close(); + } + if (client2 != null) { + client2.close(); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testLockForConfigChangeInvalid() throws IOException{ + LlapRegistryService client = null; + + try { + client = new LlapRegistryService(false); + client.init(conf); + client.start(); + + client.lockForConfigChange(20000, 10000); + } finally { + if (client != null) { + client.close(); + } + } } } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java new file mode 100644 index 0000000000..d2acc5d054 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test classes for registry implementations. + */ + +package org.apache.hadoop.hive.llap.registry.impl; diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java index a540212030..96f17e5c67 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java @@ -1,7 +1,11 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -72,8 +76,10 @@ void init(Configuration conf, LlapRegistryService registry, LlapManagementProtoc "The time threshold should be greater than 1"); Preconditions.checkArgument(maxBlacklistedNodes > 0, "The maximum number of blacklisted node should be greater than 1"); + Preconditions.checkNotNull(registry, "Registry should not be null"); + Preconditions.checkNotNull(clientFactory, "ClientFactory should not be null"); - LOG.debug("BlacklistingLlapMetricsListener initialized with " + + LOG.info("BlacklistingLlapMetricsListener initialized with " + "minServedTasksNumber={}, " + "minConfigChangeDelayMs={}, " + "timeThreshold={}, " + @@ -120,15 +126,18 @@ public void newClusterMetrics(Map newM if (maxExecutors == 0) { blacklistedNodes++; if (blacklistedNodes >= this.maxBlacklistedNodes) { - LOG.debug("Already too many blacklisted nodes. Skipping."); + LOG.info("Already enough blacklisted nodes {}. Skipping.", blacklistedNodes); return; + } else { + // We do not interested in the data for the blacklisted nodes + continue; } } if (requestHandled > this.minServedTasksNumber) { workerNum++; sumAverageTime += averageTime; - if ( averageTime > maxAverageTime) { + if (averageTime > maxAverageTime) { maxAverageTime = averageTime; maxAverageTimeEmptyExecutors = emptyExecutor; maxAverageTimeMaxExecutors = maxExecutors; @@ -155,7 +164,7 @@ public void newClusterMetrics(Map newM sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors, maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes); // Check if the slowest node is at least timeThreshold times slower than the average - long averageTimeWithoutSlowest = (sumAverageTime - maxAverageTime) / (workerNum - 1); + double averageTimeWithoutSlowest = (double)(sumAverageTime - maxAverageTime) / (workerNum - 1); if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) { // We have a candidate, let's see if we have enough empty executors. long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors; @@ -176,7 +185,7 @@ protected void setCapacity(String workerIdentity, int newExecutorNum, int newWai long currentTime = System.currentTimeMillis(); if (currentTime > nextCheckTime) { LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult = - registry.lockForConfigChange(currentTime + this.minConfigChangeDelayMs); + registry.lockForConfigChange(currentTime, currentTime + this.minConfigChangeDelayMs); LOG.debug("Got result for lock check: {}", lockResult); if (lockResult.isSuccess()) { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java index 586748f3dc..22f08246c9 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java @@ -1,7 +1,11 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -47,9 +51,8 @@ public static LlapManagementProtocolClientImplFactory basicInstance(Configuratio } public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { - LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + return new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), serviceInstance.getManagementPort(), retryPolicy, socketFactory); - return client; } -} \ No newline at end of file +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java index d36158eeef..076309d7ba 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -229,6 +229,11 @@ public long getTimestamp() { return timestamp; } + /** + * The metric values in the map. The keys are the enum names (See: LlapDaemonExecutorInfo), and + * the values are the actual values. + * @return + */ public Map getMetrics() { return metrics; } diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java index 535dfbcff8..0ea60c21e9 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java @@ -1,7 +1,11 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -11,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.llap.tezplugins.metrics; import com.google.protobuf.RpcController; @@ -44,6 +49,9 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +/** + * Test class to test BlacklistingLlapMetricsListener object. + */ public class TestBlacklistingLlapMetricsListener { private static final SetCapacityResponseProto TEST_RESPONSE = SetCapacityResponseProto.getDefaultInstance(); @@ -70,7 +78,8 @@ public void setUp() throws Exception { conf = new HiveConf(); when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); - when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(true, Long.MIN_VALUE)); + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(true, Long.MIN_VALUE)); when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); when(mockClient.setCapacity( any(RpcController.class), @@ -126,7 +135,8 @@ public void testCheckTime() throws Exception { // Return that we can not yet blacklist a node long targetTime = System.currentTimeMillis() + 10000; - when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(false, targetTime)); + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(false, targetTime)); listener.newClusterMetrics(data); verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); @@ -138,12 +148,13 @@ public void testCheckTime() throws Exception { // We will not try to set the capacity, or even lock until the time is reached listener.newClusterMetrics(data); - verify(mockRegistry, never()).lockForConfigChange(anyLong()); + verify(mockRegistry, never()).lockForConfigChange(anyLong(), anyLong()); verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); // If the time is reached, then we lock and blacklist listener.nextCheckTime = System.currentTimeMillis() - 1; - when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(true, targetTime)); + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(true, targetTime)); listener.newClusterMetrics(data); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class);