diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java index 9decb8c..6c252a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.scheduled; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ public final ExecutorService executor; public final IScheduledQueryMaintenanceService schedulerService; public final HiveConf conf; + public final String executorHostName; public ScheduledQueryExecutionContext( ExecutorService executor, @@ -41,6 +44,14 @@ this.executor = executor; this.conf = conf; this.schedulerService = service; + try { + this.executorHostName = InetAddress.getLocalHost().getHostName(); + if (executorHostName == null) { + throw new RuntimeException("Hostname is null; Can't function without a valid hostname!"); + } + } catch (UnknownHostException e) { + throw new RuntimeException("Can't function without a valid hostname!", e); + } } /** @@ -55,4 +66,8 @@ return conf.getTimeVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL, TimeUnit.MILLISECONDS); } + public String getExecutorHostName() { + return executorHostName; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index 717a452..542f2f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -117,7 +117,7 @@ reportQueryProgress(); try ( IDriver driver = DriverFactory.newDriver(DriverFactory.getNewQueryState(conf), null)) { - info.setExecutorQueryId(driver.getQueryState().getQueryId()); + info.setExecutorQueryId(buildExecutorQueryId(driver)); reportQueryProgress(); driver.run(q.getQuery()); info.setState(QueryState.FINISHED); @@ -132,11 +132,14 @@ } catch (Throwable e) { } } - reportQueryProgress(); } } + private String buildExecutorQueryId(IDriver driver) { + return String.format("%s/%s", context.getExecutorHostName(), driver.getQueryState().getQueryId()); + } + private String lockNameFor(ScheduledQueryKey scheduleKey) { return String.format("scheduled_query_%s_%s", scheduleKey.getClusterNamespace(), scheduleKey.getScheduleName()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java index 9a7b423..ed71508 100644 --- ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java +++ ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java @@ -17,15 +17,13 @@ */ package org.apache.hadoop.hive.ql.schq; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.QueryState; import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey; @@ -100,17 +98,19 @@ return res.size(); } - // Use notify/wait on this object to indicate when the scheduled query has finished executing. - static Object notifier = new Object(); public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService { + // Use notify/wait on this object to indicate when the scheduled query has finished executing. + Object notifier = new Object(); + int id = 0; private String stmt; + ScheduledQueryProgressInfo lastProgressInfo; public MockScheduledQueryService(String string) { stmt = string; } - + @Override public ScheduledQueryPollResponse scheduledQueryPoll() { @@ -129,6 +129,7 @@ public void scheduledQueryProgress(ScheduledQueryProgressInfo info) { System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(), info.getErrorMessage()); + lastProgressInfo = info; if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) { // Query is done, notify any waiters synchronized (notifier) { @@ -152,17 +153,21 @@ HiveConf conf = env_setup.getTestCtx().hiveConf; MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)"); ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); - ScheduledQueryExecutionService sQ = new ScheduledQueryExecutionService(ctx); + try (ScheduledQueryExecutionService sQ = new ScheduledQueryExecutionService(ctx)) { - executor.shutdown(); - // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough. - SessionState.getConsole().logInfo("Waiting for query execution to finish ..."); - synchronized (notifier) { - notifier.wait(30000); + executor.shutdown(); + // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough. + SessionState.getConsole().logInfo("Waiting for query execution to finish ..."); + synchronized (qService.notifier) { + qService.notifier.wait(30000); + } + SessionState.getConsole().logInfo("Done waiting for query execution!"); } - SessionState.getConsole().logInfo("Done waiting for query execution!"); executor.shutdownNow(); + assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true)); + assertThat(qService.lastProgressInfo.getExecutorQueryId(), + Matchers.containsString(ctx.getExecutorHostName() + "/")); int nr = getNumRowsReturned(driver, "select 1 from tu"); assertThat(nr, Matchers.equalTo(5));