diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 1cba7cb..0296a3d 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -174,6 +174,7 @@ KW_ENABLE: 'ENABLE' | 'ENABLED'; KW_DISABLE: 'DISABLE' | 'DISABLED'; KW_EXECUTED: 'EXECUTED'; +KW_EXECUTE: 'EXECUTE'; KW_LOCATION: 'LOCATION'; KW_TABLESAMPLE: 'TABLESAMPLE'; KW_BUCKET: 'BUCKET'; diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index b539d76..949e57b 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -455,6 +455,7 @@ TOK_WITHIN_GROUP; TOK_CRON; TOK_EXECUTED_AS; +TOK_EXECUTE; TOK_SCHEDULE; TOK_EVERY; } @@ -2128,6 +2129,7 @@ | executedAsSpec | enableSpecification | definedAsSpec + | KW_EXECUTE -> ^(TOK_EXECUTE) ; scheduleSpec diff --git parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index e154485..262afaa 100644 --- parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -865,7 +865,7 @@ | KW_TIMESTAMPTZ | KW_DEFAULT | KW_REOPTIMIZATION - | KW_EXECUTED | KW_SCHEDULED | KW_CRON | KW_EVERY | KW_AT + | KW_EXECUTED | KW_SCHEDULED | KW_CRON | KW_EVERY | KW_AT | KW_EXECUTE | KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | KW_MOVE | KW_DO | KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED | KW_UNKNOWN diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java index 3d46b18..fd0c173 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryMaintenanceWork; import org.apache.thrift.TException; @@ -45,6 +46,9 @@ ScheduledQueryMaintenanceRequest request = buildScheduledQueryRequest(); try { Hive.get().getMSC().scheduledQueryMaintenance(request); + if (work.getScheduledQuery().isSetNextExecution()) { + ScheduledQueryExecutionService.forceScheduleCheck(); + } } catch (TException | HiveException e) { setException(e); LOG.error("Failed", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java index 7e78aca..66b4653 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ScheduledQueryAnalyzer.java @@ -91,6 +91,8 @@ // in case the user will change; we have to run an authorization check beforehand checkAuthorization(type, schqStored); } + // clear the next execution time + schqStored.setNextExecutionIsSet(false); return composeOverlayObject(schqChanges, schqStored); } catch (TException e) { throw new SemanticException("unable to get Scheduled query" + e); @@ -186,6 +188,10 @@ case HiveParser.TOK_QUERY: schq.setQuery(unparseTree(node.getChild(0))); return; + case HiveParser.TOK_EXECUTE: + int now = (int) (System.currentTimeMillis() / 1000); + schq.setNextExecution(now); + return; default: throw new SemanticException("Unexpected token: " + node.getType()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index 717a452..06cfe3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,17 +45,26 @@ private static final Logger LOG = LoggerFactory.getLogger(ScheduledQueryExecutionService.class); + private static ScheduledQueryExecutionService INSTANCE = null; + private ScheduledQueryExecutionContext context; private ScheduledQueryExecutor worker; + private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger(); public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf conf0) { - HiveConf conf = new HiveConf(conf0); - MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); - ExecutorService executor = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); - ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); - return new ScheduledQueryExecutionService(ctx); + synchronized (ScheduledQueryExecutionService.class) { + if (INSTANCE != null) { + throw new IllegalStateException( + "There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary"); + } + HiveConf conf = new HiveConf(conf0); + MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); + ExecutorService executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); + ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); + INSTANCE = new ScheduledQueryExecutionService(ctx); + return INSTANCE; + } } public ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) { @@ -83,7 +93,7 @@ } } else { try { - Thread.sleep(context.getIdleSleepTime()); + sleep(context.getIdleSleepTime()); } catch (InterruptedException e) { LOG.warn("interrupt discarded"); } @@ -91,6 +101,17 @@ } } + private void sleep(long idleSleepTime) throws InterruptedException { + long checkIntrvalMs = 1000; + int origResets = forcedScheduleCheckCounter.get(); + for (long i = 0; i < idleSleepTime; i += checkIntrvalMs) { + Thread.sleep(checkIntrvalMs); + if (forcedScheduleCheckCounter.get() != origResets) { + return; + } + } + } + public synchronized void reportQueryProgress() { if (info != null) { LOG.info("Reporting query progress of {} as {} err:{}", info.getScheduledExecutionId(), info.getState(), @@ -173,15 +194,27 @@ @VisibleForTesting @Override public void close() throws IOException { - context.executor.shutdown(); - try { - context.executor.awaitTermination(1, TimeUnit.SECONDS); - context.executor.shutdownNow(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + synchronized (ScheduledQueryExecutionService.class) { + if (INSTANCE == null || INSTANCE != this) { + throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid"); + } + INSTANCE = null; + context.executor.shutdown(); + try { + context.executor.awaitTermination(1, TimeUnit.SECONDS); + context.executor.shutdownNow(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - - } + public static void forceScheduleCheck() { + INSTANCE.forcedScheduleCheckCounter.incrementAndGet(); + } + + @VisibleForTesting + public static int getForcedScheduleCheckCount() { + return INSTANCE.forcedScheduleCheckCounter.get(); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java index 336debf..f2fc421 100644 --- ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java +++ ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryStatements.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.schq; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hive.conf.HiveConf; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.testutils.HiveTestEnvSetup; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -187,7 +190,7 @@ IDriver driver = createDriver(); driver.run("set role admin"); - driver.run("create scheduled query alter1 cron '* * * * * ? *' as select 1 from tu"); + driver.run("create scheduled query alter1 cron '0 0 7 * * ? *' as select 1 from tu"); driver.run("alter scheduled query alter1 executed as 'user3'"); driver.run("alter scheduled query alter1 defined as select 22 from tu"); @@ -195,11 +198,30 @@ Optional sq = os.getMScheduledQuery(new ScheduledQueryKey("alter1", "hive")); assertTrue(sq.isPresent()); assertEquals("user3", sq.get().toThrift().getUser()); + assertThat(sq.get().getNextExecution(), Matchers.greaterThan((int) (System.currentTimeMillis() / 1000))); } } @Test + public void testExecuteImmediate() throws ParseException, Exception { + IDriver driver = createDriver(); + + driver.run("set role admin"); + driver.run("create scheduled query immed cron '0 0 7 * * ? *' as select 1"); + int cnt0 = ScheduledQueryExecutionService.getForcedScheduleCheckCount(); + driver.run("alter scheduled query immed execute"); + + try (CloseableObjectStore os = new CloseableObjectStore(env_setup.getTestCtx().hiveConf)) { + Optional sq = os.getMScheduledQuery(new ScheduledQueryKey("immed", "hive")); + assertTrue(sq.isPresent()); + assertThat(sq.get().getNextExecution(), Matchers.lessThanOrEqualTo((int) (System.currentTimeMillis() / 1000))); + int cnt1 = ScheduledQueryExecutionService.getForcedScheduleCheckCount(); + assertNotEquals(cnt1, cnt0); + } + } + + @Test public void testImpersonation() throws ParseException, Exception { HiveConf conf = env_setup.getTestCtx().hiveConf; IDriver driver = createDriver(); diff --git ql/src/test/queries/clientpositive/schq_analyze.q ql/src/test/queries/clientpositive/schq_analyze.q new file mode 100644 index 0000000..969b47b --- /dev/null +++ ql/src/test/queries/clientpositive/schq_analyze.q @@ -0,0 +1,31 @@ +--! qt:authorizer +--! qt:scheduledqueryservice +--! qt:sysdb + +set user.name=hive_admin_user; +set role admin; + +-- create external table +create external table t (a integer); + +-- disable autogather +set hive.stats.autogather=false; + +insert into t values (1),(2),(3); + +-- basic stats show that the table has "0" rows +desc formatted t; + +-- create a schedule to compute stats +create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compute statistics for columns; + +alter scheduled query t_analyze execute; + +!sleep 3; + +select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3; + +-- and the numrows have been updated +desc formatted t; + + diff --git ql/src/test/queries/clientpositive/schq_ingest.q ql/src/test/queries/clientpositive/schq_ingest.q new file mode 100644 index 0000000..2a3b2fc --- /dev/null +++ ql/src/test/queries/clientpositive/schq_ingest.q @@ -0,0 +1,44 @@ +--! qt:authorizer +--! qt:scheduledqueryservice +--! qt:sysdb + +set user.name=hive_admin_user; +set role admin; + +drop table if exists t; +drop table if exists s; + +-- suppose that this table is an external table or something +-- which supports the pushdown of filter condition on the id column +create table s(id integer, cnt integer); + +-- create an internal table and an offset table +create table t(id integer, cnt integer); +create table t_offset(offset integer); +insert into t_offset values(0); + +-- pretend that data is added to s +insert into s values(1,1); + +-- run an ingestion... +from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id); + +-- configure to run ingestion every 10 minutes +create scheduled query ingest every 10 minutes defined as +from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id); + +-- add some new values +insert into s values(2,2),(3,3); + +-- pretend that a timeout have happened +alter scheduled query ingest execute; + +!sleep 3; +select state,error_message from sys.scheduled_executions; + diff --git ql/src/test/queries/clientpositive/schq_materialized.q ql/src/test/queries/clientpositive/schq_materialized.q index fae5239..db51dad 100644 --- ql/src/test/queries/clientpositive/schq_materialized.q +++ ql/src/test/queries/clientpositive/schq_materialized.q @@ -1,5 +1,10 @@ +--! qt:authorizer +--! qt:scheduledqueryservice --! qt:sysdb +set user.name=hive_admin_user; +set role admin; + drop materialized view if exists mv1; drop table if exists emps; drop table if exists depts; @@ -42,16 +47,31 @@ JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2016-01-01 00:00:00'; +-- mv1 is used +EXPLAIN +SELECT empid, deptname FROM emps +JOIN depts ON (emps.deptno = depts.deptno) +WHERE hire_date >= '2018-01-01'; + +-- insert a new record +insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02'); + +-- mv1 is NOT used EXPLAIN SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01'; -- create a schedule to rebuild mv -create scheduled query d cron '0 */10 * * * ? *' defined as +create scheduled query d cron '0 0 * * * ? *' defined as alter materialized view mv1 rebuild; - set hive.support.quoted.identifiers=none; --- expected result to have it created select `(NEXT_EXECUTION)?+.+` from sys.scheduled_queries; + +alter scheduled query d execute; + +!sleep 3; + +-- the scheduled execution will fail - because of missing TXN; but overall it works.. +select state,error_message from sys.scheduled_executions; diff --git ql/src/test/results/clientpositive/llap/schq_analyze.q.out ql/src/test/results/clientpositive/llap/schq_analyze.q.out new file mode 100644 index 0000000..a083479 --- /dev/null +++ ql/src/test/results/clientpositive/llap/schq_analyze.q.out @@ -0,0 +1,110 @@ +PREHOOK: query: set role admin +PREHOOK: type: SHOW_ROLES +POSTHOOK: query: set role admin +POSTHOOK: type: SHOW_ROLES +PREHOOK: query: create external table t (a integer) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t +POSTHOOK: query: create external table t (a integer) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t +PREHOOK: query: insert into t values (1),(2),(3) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@t +POSTHOOK: query: insert into t values (1),(2),(3) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@t +POSTHOOK: Lineage: t.a SCRIPT [] +PREHOOK: query: desc formatted t +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@t +POSTHOOK: query: desc formatted t +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@t +# col_name data_type comment +a int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + EXTERNAL TRUE + bucketing_version 2 + numFiles 1 + numRows 0 + rawDataSize 0 + totalSize 6 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compute statistics for columns +PREHOOK: type: CREATE SCHEDULED QUERY +POSTHOOK: query: create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compute statistics for columns +POSTHOOK: type: CREATE SCHEDULED QUERY +PREHOOK: query: alter scheduled query t_analyze execute +PREHOOK: type: ALTER SCHEDULED QUERY +POSTHOOK: query: alter scheduled query t_analyze execute +POSTHOOK: type: ALTER SCHEDULED QUERY +PREHOOK: query: select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3 +PREHOOK: type: QUERY +PREHOOK: Input: information_schema@scheduled_executions +PREHOOK: Input: sys@scheduled_executions +PREHOOK: Input: sys@scheduled_queries +#### A masked pattern was here #### +POSTHOOK: query: select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: information_schema@scheduled_executions +POSTHOOK: Input: sys@scheduled_executions +POSTHOOK: Input: sys@scheduled_queries +#### A masked pattern was here #### +PREHOOK: query: desc formatted t +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@t +POSTHOOK: query: desc formatted t +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@t +# col_name data_type comment +a int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + numFiles 1 + numRows 3 + rawDataSize 3 + totalSize 6 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 diff --git ql/src/test/results/clientpositive/llap/schq_ingest.q.out ql/src/test/results/clientpositive/llap/schq_ingest.q.out new file mode 100644 index 0000000..554102b --- /dev/null +++ ql/src/test/results/clientpositive/llap/schq_ingest.q.out @@ -0,0 +1,113 @@ +PREHOOK: query: set role admin +PREHOOK: type: SHOW_ROLES +POSTHOOK: query: set role admin +POSTHOOK: type: SHOW_ROLES +PREHOOK: query: drop table if exists t +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists t +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists s +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists s +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table s(id integer, cnt integer) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@s +POSTHOOK: query: create table s(id integer, cnt integer) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@s +PREHOOK: query: create table t(id integer, cnt integer) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t +POSTHOOK: query: create table t(id integer, cnt integer) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t +PREHOOK: query: create table t_offset(offset integer) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t_offset +POSTHOOK: query: create table t_offset(offset integer) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t_offset +PREHOOK: query: insert into t_offset values(0) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@t_offset +POSTHOOK: query: insert into t_offset values(0) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@t_offset +POSTHOOK: Lineage: t_offset.offset SCRIPT [] +PREHOOK: query: insert into s values(1,1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@s +POSTHOOK: query: insert into s values(1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@s +POSTHOOK: Lineage: s.cnt SCRIPT [] +POSTHOOK: Lineage: s.id SCRIPT [] +Warning: Shuffle Join MERGEJOIN[37][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +PREHOOK: query: from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id) +PREHOOK: type: QUERY +PREHOOK: Input: default@s +PREHOOK: Input: default@t_offset +PREHOOK: Output: default@t +PREHOOK: Output: default@t_offset +POSTHOOK: query: from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@s +POSTHOOK: Input: default@t_offset +POSTHOOK: Output: default@t +POSTHOOK: Output: default@t_offset +POSTHOOK: Lineage: t.cnt SIMPLE [(s)s.FieldSchema(name:cnt, type:int, comment:null), ] +POSTHOOK: Lineage: t.id SIMPLE [(s)s.FieldSchema(name:id, type:int, comment:null), ] +POSTHOOK: Lineage: t_offset.offset EXPRESSION [(s)s.FieldSchema(name:id, type:int, comment:null), ] +Warning: Shuffle Join MERGEJOIN[34][tables = [s, t_offset]] in Stage 'Reducer 2' is a cross product +PREHOOK: query: create scheduled query ingest every 10 minutes defined as +from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id) +PREHOOK: type: CREATE SCHEDULED QUERY +POSTHOOK: query: create scheduled query ingest every 10 minutes defined as +from (select id==offset as first,* from s +join t_offset on id>=offset) s1 +insert into t select id,cnt where first = false +insert overwrite table t_offset select max(s1.id) +POSTHOOK: type: CREATE SCHEDULED QUERY +PREHOOK: query: insert into s values(2,2),(3,3) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@s +POSTHOOK: query: insert into s values(2,2),(3,3) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@s +POSTHOOK: Lineage: s.cnt SCRIPT [] +POSTHOOK: Lineage: s.id SCRIPT [] +PREHOOK: query: alter scheduled query ingest execute +PREHOOK: type: ALTER SCHEDULED QUERY +POSTHOOK: query: alter scheduled query ingest execute +POSTHOOK: type: ALTER SCHEDULED QUERY +PREHOOK: query: select state,error_message from sys.scheduled_executions +PREHOOK: type: QUERY +PREHOOK: Input: sys@scheduled_executions +#### A masked pattern was here #### +POSTHOOK: query: select state,error_message from sys.scheduled_executions +POSTHOOK: type: QUERY +POSTHOOK: Input: sys@scheduled_executions +#### A masked pattern was here #### +FINISHED NULL diff --git ql/src/test/results/clientpositive/llap/schq_materialized.q.out ql/src/test/results/clientpositive/llap/schq_materialized.q.out index 7599be7..5d573c3 100644 --- ql/src/test/results/clientpositive/llap/schq_materialized.q.out +++ ql/src/test/results/clientpositive/llap/schq_materialized.q.out @@ -1,3 +1,7 @@ +PREHOOK: query: set role admin +PREHOOK: type: SHOW_ROLES +POSTHOOK: query: set role admin +POSTHOOK: type: SHOW_ROLES PREHOOK: query: drop materialized view if exists mv1 PREHOOK: type: DROP_MATERIALIZED_VIEW POSTHOOK: query: drop materialized view if exists mv1 @@ -145,10 +149,124 @@ outputColumnNames: _col0, _col1 ListSink -PREHOOK: query: create scheduled query d cron '0 */10 * * * ? *' defined as +PREHOOK: query: insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@emps +POSTHOOK: query: insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@emps +POSTHOOK: Lineage: emps.deptno SCRIPT [] +POSTHOOK: Lineage: emps.empid SCRIPT [] +POSTHOOK: Lineage: emps.hire_date SCRIPT [] +POSTHOOK: Lineage: emps.name SCRIPT [] +POSTHOOK: Lineage: emps.salary SCRIPT [] +PREHOOK: query: EXPLAIN +SELECT empid, deptname FROM emps +JOIN depts ON (emps.deptno = depts.deptno) +WHERE hire_date >= '2018-01-01' +PREHOOK: type: QUERY +PREHOOK: Input: default@depts +PREHOOK: Input: default@emps +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN +SELECT empid, deptname FROM emps +JOIN depts ON (emps.deptno = depts.deptno) +WHERE hire_date >= '2018-01-01' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@depts +POSTHOOK: Input: default@emps +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: emps + filterExpr: ((hire_date >= TIMESTAMP'2018-01-01 00:00:00') and deptno is not null) (type: boolean) + Statistics: Num rows: 8 Data size: 224 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: ((hire_date >= TIMESTAMP'2018-01-01 00:00:00') and deptno is not null) (type: boolean) + Statistics: Num rows: 8 Data size: 224 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: empid (type: int), deptno (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 64 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col1 (type: int) + Statistics: Num rows: 8 Data size: 64 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 3 + Map Operator Tree: + TableScan + alias: depts + filterExpr: deptno is not null (type: boolean) + Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: deptno is not null (type: boolean) + Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: deptno (type: int), deptname (type: varchar(256)) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: varchar(256)) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col3 + Statistics: Num rows: 8 Data size: 744 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int), _col3 (type: varchar(256)) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 744 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 8 Data size: 744 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: create scheduled query d cron '0 0 * * * ? *' defined as alter materialized view mv1 rebuild PREHOOK: type: CREATE SCHEDULED QUERY -POSTHOOK: query: create scheduled query d cron '0 */10 * * * ? *' defined as +POSTHOOK: query: create scheduled query d cron '0 0 * * * ? *' defined as alter materialized view mv1 rebuild POSTHOOK: type: CREATE SCHEDULED QUERY PREHOOK: query: select `(NEXT_EXECUTION)?+.+` from sys.scheduled_queries @@ -159,4 +277,17 @@ POSTHOOK: type: QUERY POSTHOOK: Input: sys@scheduled_queries #### A masked pattern was here #### -1 d true hive 0 */10 * * * ? * hive_test_user alter materialized view `default`.`mv1` rebuild +2 d true hive 0 0 * * * ? * hive_admin_user alter materialized view `default`.`mv1` rebuild +PREHOOK: query: alter scheduled query d execute +PREHOOK: type: ALTER SCHEDULED QUERY +POSTHOOK: query: alter scheduled query d execute +POSTHOOK: type: ALTER SCHEDULED QUERY +PREHOOK: query: select state,error_message from sys.scheduled_executions +PREHOOK: type: QUERY +PREHOOK: Input: sys@scheduled_executions +#### A masked pattern was here #### +POSTHOOK: query: select state,error_message from sys.scheduled_executions +POSTHOOK: type: QUERY +POSTHOOK: Input: sys@scheduled_executions +#### A masked pattern was here #### +FAILED FAILED: SemanticException [Error 10265]: This command is not allowed on an ACID table default.emps with a non-ACID transaction manager. Failed command: /* schedule: d */alter materialized view `default`.`mv1` rebuild diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 317c9cb..3eff37f 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -12856,11 +12856,15 @@ openTransaction(); MScheduledQuery persisted = existing.get(); persisted.doUpdate(schq); - Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule()); - if (nextExecutionTime == null) { - throw new InvalidInputException("Invalid schedule: " + schq.getSchedule()); + if (!scheduledQuery.isSetNextExecution()) { + Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule()); + if (nextExecutionTime == null) { + throw new InvalidInputException("Invalid schedule: " + schq.getSchedule()); + } + persisted.setNextExecution(nextExecutionTime); + } else { + persisted.setNextExecution(schq.getNextExecution()); } - persisted.setNextExecution(nextExecutionTime); pm.makePersistent(persisted); commited = commitTransaction(); } finally {