diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 1cba7cb..0296a3d 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -174,6 +174,7 @@ KW_ENABLE: 'ENABLE' | 'ENABLED'; KW_DISABLE: 'DISABLE' | 'DISABLED'; KW_EXECUTED: 'EXECUTED'; +KW_EXECUTE: 'EXECUTE'; KW_LOCATION: 'LOCATION'; KW_TABLESAMPLE: 'TABLESAMPLE'; KW_BUCKET: 'BUCKET'; diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index b539d76..949e57b 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -455,6 +455,7 @@ TOK_WITHIN_GROUP; TOK_CRON; TOK_EXECUTED_AS; +TOK_EXECUTE; TOK_SCHEDULE; TOK_EVERY; } @@ -2128,6 +2129,7 @@ | executedAsSpec | enableSpecification | definedAsSpec + | KW_EXECUTE -> ^(TOK_EXECUTE) ; scheduleSpec diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index e154485..262afaa 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -865,7 +865,7 @@ | KW_TIMESTAMPTZ | KW_DEFAULT | KW_REOPTIMIZATION - | KW_EXECUTED | KW_SCHEDULED | KW_CRON | KW_EVERY | KW_AT + | KW_EXECUTED | KW_SCHEDULED | KW_CRON | KW_EVERY | KW_AT | KW_EXECUTE | KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | KW_MOVE | KW_DO | KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED | KW_UNKNOWN diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java index 3d46b18..fd0c173 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryMaintenanceWork; import org.apache.thrift.TException; @@ -45,6 +46,9 @@ ScheduledQueryMaintenanceRequest request = buildScheduledQueryRequest(); try { Hive.get().getMSC().scheduledQueryMaintenance(request); + if (work.getScheduledQuery().isSetNextExecution()) { + ScheduledQueryExecutionService.forceScheduleCheck(); + } } catch (TException | HiveException e) { setException(e); LOG.error("Failed", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java index 7e78aca..66b4653 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java @@ -91,6 +91,8 @@ // in case the user will change; we have to run an authorization check beforehand checkAuthorization(type, schqStored); } + // clear the next execution time + schqStored.setNextExecutionIsSet(false); return composeOverlayObject(schqChanges, schqStored); } catch (TException e) { throw new SemanticException("unable to get Scheduled query" + e); @@ -186,6 +188,10 @@ case HiveParser.TOK_QUERY: schq.setQuery(unparseTree(node.getChild(0))); return; + case HiveParser.TOK_EXECUTE: + int now = (int) (System.currentTimeMillis() / 1000); + schq.setNextExecution(now); + return; default: throw new SemanticException("Unexpected token: " + node.getType()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index 717a452..06cfe3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,17 +45,26 @@ private static final Logger LOG = LoggerFactory.getLogger(ScheduledQueryExecutionService.class); + private static ScheduledQueryExecutionService INSTANCE = null; + private ScheduledQueryExecutionContext context; private ScheduledQueryExecutor worker; + private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger(); public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf conf0) { - HiveConf conf = new HiveConf(conf0); - MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); - ExecutorService executor = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); - ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); - return new ScheduledQueryExecutionService(ctx); + synchronized (ScheduledQueryExecutionService.class) { + if (INSTANCE != null) { + throw new IllegalStateException( + "There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary"); + } + HiveConf conf = new HiveConf(conf0); + MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); + ExecutorService executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); + ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); + INSTANCE = new ScheduledQueryExecutionService(ctx); + return INSTANCE; + } } public ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) { @@ -83,7 +93,7 @@ } } else { try { - Thread.sleep(context.getIdleSleepTime()); + sleep(context.getIdleSleepTime()); } catch (InterruptedException e) { LOG.warn("interrupt discarded"); } @@ -91,6 +101,17 @@ } } + private void sleep(long idleSleepTime) throws InterruptedException { + long checkIntrvalMs = 1000; + int origResets = forcedScheduleCheckCounter.get(); + for (long i = 0; i < idleSleepTime; i += checkIntrvalMs) { + Thread.sleep(checkIntrvalMs); + if (forcedScheduleCheckCounter.get() != origResets) { + return; + } + } + } + public synchronized void reportQueryProgress() { if (info != null) { LOG.info("Reporting query progress of {} as {} err:{}", info.getScheduledExecutionId(), info.getState(), @@ -173,15 +194,27 @@ @VisibleForTesting @Override public void close() throws IOException { - context.executor.shutdown(); - try { - context.executor.awaitTermination(1, TimeUnit.SECONDS); - context.executor.shutdownNow(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + synchronized (ScheduledQueryExecutionService.class) { + if (INSTANCE == null || INSTANCE != this) { + throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid"); + } + INSTANCE = null; + context.executor.shutdown(); + try { + context.executor.awaitTermination(1, TimeUnit.SECONDS); + context.executor.shutdownNow(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - - } + public static void forceScheduleCheck() { + INSTANCE.forcedScheduleCheckCounter.incrementAndGet(); + } + + @VisibleForTesting + public static int getForcedScheduleCheckCount() { + return INSTANCE.forcedScheduleCheckCounter.get(); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java index 336debf..f2fc421 100644 --- ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java +++ ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.schq; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hive.conf.HiveConf; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.testutils.HiveTestEnvSetup; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -187,7 +190,7 @@ IDriver driver = createDriver(); driver.run("set role admin"); - driver.run("create scheduled query alter1 cron '* * * * * ? *' as select 1 from tu"); + driver.run("create scheduled query alter1 cron '0 0 7 * * ? *' as select 1 from tu"); driver.run("alter scheduled query alter1 executed as 'user3'"); driver.run("alter scheduled query alter1 defined as select 22 from tu"); @@ -195,11 +198,30 @@ Optional sq = os.getMScheduledQuery(new ScheduledQueryKey("alter1", "hive")); assertTrue(sq.isPresent()); assertEquals("user3", sq.get().toThrift().getUser()); + assertThat(sq.get().getNextExecution(), Matchers.greaterThan((int) (System.currentTimeMillis() / 1000))); } } @Test + public void testExecuteImmediate() throws ParseException, Exception { + IDriver driver = createDriver(); + + driver.run("set role admin"); + driver.run("create scheduled query immed cron '0 0 7 * * ? *' as select 1"); + int cnt0 = ScheduledQueryExecutionService.getForcedScheduleCheckCount(); + driver.run("alter scheduled query immed execute"); + + try (CloseableObjectStore os = new CloseableObjectStore(env_setup.getTestCtx().hiveConf)) { + Optional sq = os.getMScheduledQuery(new ScheduledQueryKey("immed", "hive")); + assertTrue(sq.isPresent()); + assertThat(sq.get().getNextExecution(), Matchers.lessThanOrEqualTo((int) (System.currentTimeMillis() / 1000))); + int cnt1 = ScheduledQueryExecutionService.getForcedScheduleCheckCount(); + assertNotEquals(cnt1, cnt0); + } + } + + @Test public void testImpersonation() throws ParseException, Exception { HiveConf conf = env_setup.getTestCtx().hiveConf; IDriver driver = createDriver(); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 317c9cb..3eff37f 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -12856,11 +12856,15 @@ openTransaction(); MScheduledQuery persisted = existing.get(); persisted.doUpdate(schq); - Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule()); - if (nextExecutionTime == null) { - throw new InvalidInputException("Invalid schedule: " + schq.getSchedule()); + if (!scheduledQuery.isSetNextExecution()) { + Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule()); + if (nextExecutionTime == null) { + throw new InvalidInputException("Invalid schedule: " + schq.getSchedule()); + } + persisted.setNextExecution(nextExecutionTime); + } else { + persisted.setNextExecution(schq.getNextExecution()); } - persisted.setNextExecution(nextExecutionTime); pm.makePersistent(persisted); commited = commitTransaction(); } finally {