diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index c9da7a2..1cba7cb 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -376,6 +376,8 @@ KW_ALLOC_FRACTION: 'ALLOC_FRACTION'; KW_SCHEDULING_POLICY: 'SCHEDULING_POLICY'; KW_SCHEDULED: 'SCHEDULED'; +KW_EVERY: 'EVERY'; +KW_AT: 'AT'; KW_CRON: 'CRON'; KW_PATH: 'PATH'; KW_MAPPING: 'MAPPING'; diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index dbafa30..b539d76 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -455,6 +455,8 @@ TOK_WITHIN_GROUP; TOK_CRON; TOK_EXECUTED_AS; +TOK_SCHEDULE; +TOK_EVERY; } @@ -2132,6 +2134,8 @@ @init { pushMsg("schedule specification", state); } @after { popMsg(state); } : KW_CRON cronString=StringLiteral -> ^(TOK_CRON $cronString) + | KW_EVERY value=Number? qualifier=intervalQualifiers + ((KW_AT|KW_OFFSET KW_BY) offsetTs=StringLiteral)? -> ^(TOK_SCHEDULE ^(TOK_EVERY $value?) $qualifier $offsetTs?) ; executedAsSpec diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index e6ea41a..e154485 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -865,7 +865,7 @@ | KW_TIMESTAMPTZ | KW_DEFAULT | KW_REOPTIMIZATION - | KW_EXECUTED | KW_SCHEDULED | KW_CRON + | KW_EXECUTED | KW_SCHEDULED | KW_CRON | KW_EVERY | KW_AT | KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | KW_MOVE | KW_DO | KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED | KW_UNKNOWN diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index eeb1ae8..7fffc50 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -32,12 +32,10 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.TaskRunner; -import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; /** * Hook Context keeps all the necessary information for all the hooks. * New implemented hook can get the query plan, job conf and the list of all completed tasks from this hook context diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java index 66394a3..ca90e72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java @@ -18,9 +18,15 @@ package org.apache.hadoop.hive.ql.parse; +import com.cronutils.builder.CronBuilder; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.field.expression.FieldExpression; import com.google.common.base.Objects; import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.ScheduledQuery; @@ -35,10 +41,16 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.common.util.TimestampParser; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.cronutils.model.field.expression.FieldExpressionFactory.always; +import static com.cronutils.model.field.expression.FieldExpressionFactory.on; +import static com.cronutils.model.field.expression.FieldExpressionFactory.every; +import static com.cronutils.model.field.expression.FieldExpressionFactory.questionMark; + import java.util.ArrayList; import java.util.List; @@ -163,6 +175,9 @@ case HiveParser.TOK_CRON: schq.setSchedule(unescapeSQLString(node.getChild(0).getText())); return; + case HiveParser.TOK_SCHEDULE: + schq.setSchedule(interpretEveryNode(parseInteger(node.getChild(0).getChild(0), 1), node.getChild(1).getType(), parseTimeStamp(node.getChild(2)))); + return; case HiveParser.TOK_EXECUTED_AS: schq.setUser(unescapeSQLString(node.getChild(0).getText())); return; @@ -174,6 +189,82 @@ } } + private String interpretEveryNode(int every, int intervalToken, Timestamp ts) throws SemanticException { + CronBuilder b = getDefaultCronBuilder(); + switch (intervalToken) { + case HiveParser.TOK_INTERVAL_DAY_LITERAL: + if (every != 1) { + throw new SemanticException("EVERY " + every + " DAY is not supported; only EVERY DAY is supported"); + } + b.withSecond(on(ts.getSeconds())); + b.withMinute(on(ts.getMinutes())); + b.withHour(on(ts.getHours())); + break; + case HiveParser.TOK_INTERVAL_HOUR_LITERAL: + b.withSecond(on(ts.getSeconds())); + b.withMinute(on(ts.getMinutes())); + b.withHour(every(on0(ts.getHours()), every)); + break; + case HiveParser.TOK_INTERVAL_MINUTE_LITERAL: + b.withSecond(on(ts.getSeconds())); + b.withMinute(every(on0(ts.getMinutes()), every)); + break; + case HiveParser.TOK_INTERVAL_SECOND_LITERAL: + b.withSecond(every(on0(ts.getSeconds()), every)); + break; + default: + throw new SemanticException("not supported schedule interval(only HOUR/MINUTE/SECOND is supported)"); + } + + return b.instance().asString(); + } + + private FieldExpression on0(int n) { + if (n == 0) { + return always(); + } else { + return on(n); + } + } + + private int parseInteger(Tree node, int def) { + if (node == null) { + return def; + } else { + return Integer.parseInt(node.getText()); + } + } + + private Timestamp parseTimeStamp(Tree offsetNode) { + if (offsetNode == null) { + return new Timestamp(); + } + List s = new ArrayList<>(); + s.add(TimestampParser.ISO_8601_FORMAT_STR); + s.add(TimestampParser.RFC_1123_FORMAT_STR); + s.add("HH:mm:ss"); + s.add("H:mm:ss"); + s.add("HH:mm"); + + TimestampParser p = new TimestampParser(s); + return p.parseTimestamp(unescapeSQLString(offsetNode.getText())); + } + + + private CronBuilder getDefaultCronBuilder() { + CronDefinition definition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ); + CronBuilder b = CronBuilder.cron(definition) + .withYear(always()) + .withDoM(always()) + .withMonth(always()) + .withDoW(questionMark()) + .withHour(always()) + .withMinute(always()) + .withMinute(always()) + .withSecond(always()); + return b; + } + private void checkAuthorization(ScheduledQueryMaintenanceRequestType type, ScheduledQuery schq) throws SemanticException { boolean schqAuthorization = (SessionState.get().getAuthorizerV2() != null) diff --git ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java index ac4e164..336debf 100644 --- ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java +++ ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java @@ -84,11 +84,62 @@ } } - @Test - public void testSimpleCreate() throws ParseException, Exception { + + private void checkScheduleCreation(String schqName, String schedule, String expectedSchedule) + throws CommandProcessorException, Exception { IDriver driver = createDriver(); driver.run("set role admin"); - driver.run("create scheduled query simplecreate cron '* * * * * ? *' as select 1 from tu"); + driver.run("create scheduled query " + schqName + " " + schedule + " as select 1 from tu"); + try (CloseableObjectStore os = new CloseableObjectStore(env_setup.getTestCtx().hiveConf)) { + Optional sq = os.getMScheduledQuery(new ScheduledQueryKey(schqName, "hive")); + assertTrue(sq.isPresent()); + assertEquals(expectedSchedule, sq.get().getSchedule()); + } + } + + @Test + public void testSimpleCreate() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "cron '* * * * * ? *'", "* * * * * ? *"); + } + + private String getMethodName() { + StackTraceElement[] stackTrace = new Throwable().getStackTrace(); + return stackTrace[1].getMethodName(); + } + + @Test + public void testMinutes() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every minute", "0 * * * * ? *"); + } + + @Test + public void test10Minutes() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every 10 minutes", "0 */10 * * * ? *"); + } + + @Test + public void test10Seconds() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every 10 seconds", "*/10 * * * * ? *"); + } + + @Test + public void test4Hours() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every 4 hours", "0 0 */4 * * ? *"); + } + + @Test + public void test4Hours2() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every 4 hours offset by '2:03:04'", "4 3 2/4 * * ? *"); + } + + @Test + public void testDay() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every day offset by '2:03:04'", "4 3 2 * * ? *"); + } + + @Test + public void testDay2() throws ParseException, Exception { + checkScheduleCreation(getMethodName(), "every day at '2:03:04'", "4 3 2 * * ? *"); } @Test(expected = CommandProcessorException.class)