diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2a32d89021..1536db4544 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -617,6 +617,22 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Name of the source cluster for the replication."), REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null, "Name of the target cluster for the replication."), + REPL_RETRY_INTIAL_DELAY("hive.repl.retry.initial.delay", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Initial Delay before retry starts."), + REPL_RETRY_BACKOFF_COEFFICIENT("hive.repl.retry.backoff.coefficient", 1.2f, + "The backoff coefficient for exponential retry delay between retries. " + + "Previous Delay * Backoff Coefficient will determine the next retry interval"), + REPL_RETRY_JITTER("hive.repl.retry.jitter", "30s", new TimeValidator(TimeUnit.SECONDS), + "A random jitter to be applied to avoid all retries happening at the same time."), + REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES("hive.repl.retry.max.delay.between.retries", "60m", + new TimeValidator(TimeUnit.MINUTES), + "Maximum allowed retry delay in seconds after including exponential backoff. " + + "If this limit is reached, retry will continue with this retry duration."), + REPL_RETRY_TOTAL_DURATION("hive.repl.retry.total.duration", "24h", + new TimeValidator(TimeUnit.HOURS), + "Total allowed retry duration in seconds inclusive of all retries. Once this is exhausted, " + + "the policy instance will be marked as failed and will need manual intervention to restart."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java new file mode 100644 index 0000000000..e6c9673550 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.util; + +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Class to implement any retry logic in case of exceptions. + */ +public class Retryable { + private static long MINIMUM_DELAY_IN_SEC = 1; + + private long totalDurationInSeconds; + private List> retryOn; + private long initialDelayInSeconds; + private long maxRetryDelayInSeconds; + private double backOff; + private int maxJitterInSeconds; + + private Retryable() { + this.retryOn = new ArrayList<>(); + this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS); + this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS); + this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal; + this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS); + this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);; + } + + public static Builder builder() { + return new Builder(); + } + + public T executeCallable(Callable callable) throws Throwable { + long startTime = System.currentTimeMillis(); + long delay = this.initialDelayInSeconds; + Exception currentCapturedException = null; + while(true) { + try { + return callable.call(); + } catch (Exception e) { + if (this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) { + if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { + // case where waiting would go beyond max duration. So throw exception and return + throw e; + } + sleep(delay); + //retry case. compute next sleep time + delay = getNextDelay(delay, currentCapturedException, e); + // reset current captured exception. + currentCapturedException = e; + } else { + // Exception cannot be retried on. Throw exception and return + throw e; + } + } + } + } + + private void sleep(long seconds) { + try { + Thread.sleep(seconds * 1000); + } catch (InterruptedException e) { + // no-op.. just proceed + } + } + + private long getNextDelay(long currentDelay, final Exception previousException, final Exception currentException) { + if (previousException != null && !previousException.getClass().equals(currentException.getClass())) { + // New exception encountered. Returning initial delay for next retry. + return this.initialDelayInSeconds; + } + + if (currentDelay <= 0) { // in case initial delay was set to 0. + currentDelay = MINIMUM_DELAY_IN_SEC; + } + + currentDelay *= this.backOff; + if (this.maxJitterInSeconds > 0) { + currentDelay += new Random().nextInt(this.maxJitterInSeconds); + } + + if (currentDelay > this.maxRetryDelayInSeconds) { + currentDelay = this.maxRetryDelayInSeconds; + } + + return currentDelay; + } + + private long elapsedTimeInSeconds(long fromTimeMillis) { + return (System.currentTimeMillis() - fromTimeMillis)/ 1000; + } + + public static class Builder { + private final Retryable runnable = new Retryable(); + public Builder() { + } + + public Builder withHiveConf(HiveConf conf) { + runnable.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS); + runnable.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS); + runnable.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars + .REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS); + runnable.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT); + runnable.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS); + return this; + } + + public Retryable build() { + return runnable; + } + + public Builder withTotalDuration(long maxDuration) { + runnable.totalDurationInSeconds = maxDuration; + return this; + } + + // making this thread safe as it appends to list + public synchronized Builder withRetryOnException(final Class exceptionClass) { + if (exceptionClass != null && + runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.retryOn.add(exceptionClass); + } + return this; + } + + public synchronized Builder withRetryOnExceptionList(final List> exceptionClassList) { + for (final Class exceptionClass : exceptionClassList) { + if (exceptionClass != null && + runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.retryOn.add(exceptionClass); + } + } + return this; + } + + public Builder withInitialDelay(long initialDelayInSeconds) { + runnable.initialDelayInSeconds = initialDelayInSeconds; + return this; + } + + public Builder withMaxRetryDelay(long maxRetryDelayInSeconds) { + runnable.maxRetryDelayInSeconds = maxRetryDelayInSeconds; + return this; + } + + public Builder withBackoff(double backoff) { + runnable.backOff = backoff; + return this; + } + + public Builder withMaxJitterValue(int maxJitterInSeconds) { + runnable.maxJitterInSeconds = maxJitterInSeconds; + return this; + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/RetryableTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/RetryableTest.java new file mode 100644 index 0000000000..f27545c0c8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/RetryableTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.Callable; + +/** + * Tests for retriable interface. + */ +public class RetryableTest { + + @Test + public void testRetrySuccessValidException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 1) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetrySuccessValidExceptionList() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 1) { + count++; + throw new NullPointerException(); + } else if (count == 2) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetryFailureWithMaxDuration() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withBackoff(10.0) + .withInitialDelay(1) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 2) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithInitialDelay() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(10.0) + .withInitialDelay(10) + .withMaxJitterValue(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(IOException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithMaxRetryDelay() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(10.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withMaxRetryDelay(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetryFailureWithBackoff() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(100.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 2) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetrySuccessWithMaxDurationDifferentException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(30) + .withBackoff(10.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetryFailureInValidException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + @Override + public Void call() throws Exception { + throw new IOException(); + } + }); + } catch (Exception e) { + Assert.assertEquals(IOException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithHiveConf() throws Throwable { + HiveConf conf = new HiveConf(RetryableTest.class); + conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s"); + conf.setFloat(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.varname, 1.0f); + conf.set(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.varname, "60s"); + conf.set(HiveConf.ConfVars.REPL_RETRY_JITTER.varname, "1s"); + conf.set(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.varname, "30s"); + long startTime = System.currentTimeMillis(); + long totalTime = 60; + Retryable retryable = Retryable.builder().withHiveConf(conf) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + @Override + public Void call() throws Exception { + executeWithDelay(startTime, totalTime); + return null; + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + private void executeWithDelay(long startTime, long totalTime) { + long currentTime = System.currentTimeMillis(); + if (currentTime - startTime < totalTime * 1000) { + throw new NullPointerException(); + } + } +}