diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3d4e9e0..7ea2de9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4873,6 +4873,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SECURITY_AUTHORIZATION_SCHEDULED_QUERIES_SUPPORTED("hive.security.authorization.scheduled.queries.supported", false, "Enable this if the configured authorizer is able to handle scheduled query related calls."), + HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null), + "Maximal number of scheduled query executors to allow."), HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + diff --git kafka-handler/README.md kafka-handler/README.md index 753e3e3..e7761e3 100644 --- kafka-handler/README.md +++ kafka-handler/README.md @@ -50,6 +50,9 @@ ALTER TABLE SET TBLPROPERTIES ( "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe"); ``` + +If you use Confluent's Avro serialzier or deserializer with the Confluent Schema Registry, you will need to remove five bytes from the beginning of each message. These five bytes represent [a magic byte and a four-byte schema ID from the registry.](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format) +This can be done by setting `"avro.serde.type"="skip"` and `"avro.serde.skip.bytes"="5"`. In this case it is also recommended to set the Avro schema either via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority. List of supported serializers and deserializers: diff --git kafka-handler/pom.xml kafka-handler/pom.xml index 6ad41de..8de338f 100644 --- kafka-handler/pom.xml +++ kafka-handler/pom.xml @@ -80,6 +80,10 @@ kafka-clients ${kafka.version} + + org.apache.avro + avro + junit @@ -118,8 +122,27 @@ 1.7.30 test + + io.confluent + kafka-avro-serializer + 5.4.0 + test + + + org.apache.avro + avro + + + + + + confluent + http://packages.confluent.io/maven/ + + + dev-fast-build @@ -190,5 +213,27 @@ + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-test-sources + + schema + + + + + ${project.basedir}/src/resources/ + true + String + + + + diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index ffe7788..b138bf0 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -133,12 +134,44 @@ Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further"); Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty); LOG.debug("Building Avro Reader with schema {}", schemaFromProperty); - bytesConverter = new AvroBytesConverter(schema); + bytesConverter = getByteConverterForAvroDelegate(schema, tbl); } else { bytesConverter = new BytesWritableConverter(); } } + enum BytesConverterType { + SKIP, + NONE; + + static BytesConverterType fromString(String value) { + try { + return BytesConverterType.valueOf(value.trim().toUpperCase()); + } catch (Exception e){ + return NONE; + } + } + } + + BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) throws SerDeException { + String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(); + String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, + BytesConverterType.NONE.toString()); + BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty); + String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(); + Integer avroSkipBytes = 0; + try { + avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName)); + } catch (NumberFormatException e) { + throw new SerDeException("Value of " + avroSkipBytesPropertyName + " could not be parsed into an integer properly.", e); + } + switch (avroByteConverterType) { + case SKIP: return new AvroSkipBytesConverter(schema, avroSkipBytes); + case NONE: return new AvroBytesConverter(schema); + default: throw new SerDeException("Value of " + avroBytesConverterPropertyName + " was invalid."); + } + } + @Override public Class getSerializedClass() { return delegateSerDe.getSerializedClass(); } @@ -327,7 +360,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { K getWritable(byte[] value); } - private static class AvroBytesConverter implements BytesConverter { + static class AvroBytesConverter implements BytesConverter { private final Schema schema; private final DatumReader dataReader; private final GenericDatumWriter gdw = new GenericDatumWriter<>(); @@ -354,12 +387,18 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { return valueBytes; } + Decoder getDecoder(byte[] value) throws SerDeException { + return DecoderFactory.get().binaryDecoder(value, null); + } + @Override public AvroGenericRecordWritable getWritable(byte[] value) { GenericRecord avroRecord = null; try { - avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + avroRecord = dataReader.read(null, getDecoder(value)); } catch (IOException e) { Throwables.propagate(new SerDeException(e)); + } catch (SerDeException e) { + Throwables.propagate(e); } avroGenericRecordWritable.setRecord(avroRecord); @@ -369,6 +408,30 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { } } + /** + * Avro converter which skips the first @skipBytes of each message. + * + * This may be needed for various serializers, such as the Confluent Avro serializer, which uses the first five + * bytes to indicate a magic byte, as well as a four byte schema ID. + */ + static class AvroSkipBytesConverter extends AvroBytesConverter { + private final int skipBytes; + + AvroSkipBytesConverter(Schema schema, int skipBytes) { + super(schema); + this.skipBytes = skipBytes; + } + + @Override + Decoder getDecoder(byte[] value) throws SerDeException { + try { + return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null); + } catch (ArrayIndexOutOfBoundsException e) { + throw new SerDeException("Skip bytes value is larger than the message length.", e); + } + } + } + private static class BytesWritableConverter implements BytesConverter { @Override public byte[] getBytes(BytesWritable writable) { return writable.getBytes(); diff --git kafka-handler/src/resources/SimpleRecord.avsc kafka-handler/src/resources/SimpleRecord.avsc new file mode 100644 index 0000000..47b6156 --- /dev/null +++ kafka-handler/src/resources/SimpleRecord.avsc @@ -0,0 +1,13 @@ +{ + "type" : "record", + "name" : "SimpleRecord", + "namespace" : "org.apache.hadoop.hive.kafka", + "fields" : [ { + "name" : "id", + "type" : "string" + }, { + "name" : "name", + "type" : "string" + } + ] +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java new file mode 100644 index 0000000..084e69d --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.Maps; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +import org.apache.avro.Schema; + +import org.apache.hadoop.hive.kafka.KafkaSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.SerDeException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Test class for Hive Kafka Avro SerDe with variable bytes skipped. + */ +public class AvroBytesConverterTest { + private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build(); + private static byte[] simpleRecordConfluentBytes; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + /** + * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. + */ + @BeforeClass + public static void setUp() { + Map config = Maps.newHashMap(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); + KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient()); + avroSerializer.configure(config, false); + simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord); + } + + private void runConversionTest(Properties tbl, byte[] serializedSimpleRecord) throws SerDeException { + KafkaSerDe serde = new KafkaSerDe(); + Schema schema = SimpleRecord.getClassSchema(); + KafkaSerDe.AvroBytesConverter conv = (KafkaSerDe.AvroBytesConverter)serde.getByteConverterForAvroDelegate(schema, tbl); + AvroGenericRecordWritable simpleRecordWritable = conv.getWritable(serializedSimpleRecord); + + Assert.assertNotNull(simpleRecordWritable); + Assert.assertEquals(SimpleRecord.class, simpleRecordWritable.getRecord().getClass()); + + SimpleRecord simpleRecordDeserialized = (SimpleRecord) simpleRecordWritable.getRecord(); + + Assert.assertNotNull(simpleRecordDeserialized); + Assert.assertEquals(simpleRecord, simpleRecordDeserialized); + } + + /** + * Tests the default case of no skipped bytes per record works properly. + */ + @Test + public void convertWithAvroBytesConverter() throws SerDeException { + // Since the serialized version was created by Confluent, + // let's remove the first five bytes to get the actual message. + int recordLength = simpleRecordConfluentBytes.length; + byte[] simpleRecordWithNoOffset = Arrays.copyOfRange(simpleRecordConfluentBytes, 5, recordLength); + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "NONE"); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), "5"); + + runConversionTest(tbl, simpleRecordWithNoOffset); + } + + /** + * Tests that the skip converter skips 5 bytes properly, which matches what Confluent needs. + */ + @Test + public void convertWithConfluentAvroBytesConverter() throws SerDeException { + Integer offset = 5; + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + runConversionTest(tbl, simpleRecordConfluentBytes); + } + + /** + * Tests that the skip converter skips a custom number of bytes properly. + */ + @Test + public void convertWithCustomAvroSkipBytesConverter() throws SerDeException { + Integer offset = 2; + // Remove all but two bytes of the five byte offset which Confluent adds, + // to simulate a message with only 2 bytes in front of each message. + int recordLength = simpleRecordConfluentBytes.length; + byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecordConfluentBytes, 5 - offset, recordLength); + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + runConversionTest(tbl, simpleRecordAsOffsetBytes); + } + + /** + * Test that when we skip more bytes than are in the message, we throw an exception properly. + */ + @Test + public void skipBytesLargerThanMessageSizeConverter() throws SerDeException { + // The simple record we are serializing is two strings, that combine to be 7 characters or 14 bytes. + // Adding in the 5 byte offset, we get 19 bytes. To make sure we go bigger than that, we are setting + // the offset to ten times that value. + Integer offset = 190; + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + exception.expect(RuntimeException.class); + exception.expectMessage("org.apache.hadoop.hive.serde2.SerDeException: " + + "Skip bytes value is larger than the message length."); + runConversionTest(tbl, simpleRecordConfluentBytes); + } + + /** + * Test that we properly parse the converter type, no matter the casing. + */ + @Test + public void bytesConverterTypeParseTest() { + Map testCases = new HashMap() {{ + put("skip", KafkaSerDe.BytesConverterType.SKIP); + put("sKIp", KafkaSerDe.BytesConverterType.SKIP); + put("SKIP", KafkaSerDe.BytesConverterType.SKIP); + put(" skip ", KafkaSerDe.BytesConverterType.SKIP); + put("SKIP1", KafkaSerDe.BytesConverterType.NONE); + put("skipper", KafkaSerDe.BytesConverterType.NONE); + put("", KafkaSerDe.BytesConverterType.NONE); + put(null, KafkaSerDe.BytesConverterType.NONE); + put("none", KafkaSerDe.BytesConverterType.NONE); + put("NONE", KafkaSerDe.BytesConverterType.NONE); + put(" none ", KafkaSerDe.BytesConverterType.NONE); + }}; + + for(Map.Entry entry: testCases.entrySet()) { + Assert.assertEquals(entry.getValue(), KafkaSerDe.BytesConverterType.fromString(entry.getKey())); + } + } +} diff --git metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql index fde6f02..03540bb 100644 --- metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql +++ metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql @@ -1211,6 +1211,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` ( `USER` string, `QUERY` string, `NEXT_EXECUTION` bigint, + `ACTIVE_EXECUTION_ID` bigint, CONSTRAINT `SYS_PK_SCHEDULED_QUERIES` PRIMARY KEY (`SCHEDULED_QUERY_ID`) DISABLE ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' @@ -1225,7 +1226,8 @@ TBLPROPERTIES ( \"SCHEDULE\", \"USER\", \"QUERY\", - \"NEXT_EXECUTION\" + \"NEXT_EXECUTION\", + \"ACTIVE_EXECUTION_ID\" FROM \"SCHEDULED_QUERIES\"" ); @@ -1795,7 +1797,8 @@ select `SCHEDULE`, `USER`, `QUERY`, - FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION + FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION, + `ACTIVE_EXECUTION_ID` FROM SYS.SCHEDULED_QUERIES ; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java index fd0c173..5abfa4d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.schq; import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest; +import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequestType; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -46,7 +47,11 @@ public int execute() { ScheduledQueryMaintenanceRequest request = buildScheduledQueryRequest(); try { Hive.get().getMSC().scheduledQueryMaintenance(request); - if (work.getScheduledQuery().isSetNextExecution()) { + if (work.getScheduledQuery().isSetNextExecution() + || request.getType() == ScheduledQueryMaintenanceRequestType.CREATE) { + // we might have a scheduled query available for execution; immediately: + // * in case a schedule is altered to be executed at a specific time + // * in case we created a new scheduled query - for say run every second ScheduledQueryExecutionService.forceScheduleCheck(); } } catch (TException | HiveException e) { @@ -68,5 +73,4 @@ private ScheduledQueryMaintenanceRequest buildScheduledQueryRequest() { public StageType getType() { return StageType.SCHEDULED_QUERY_MAINT; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java index 1bb24ee..32cb316 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java @@ -66,4 +66,8 @@ public long getProgressReporterSleepTime() { return conf.getTimeVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL, TimeUnit.MILLISECONDS); } + public int getNumberOfExecutors() { + return conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index 9a6237c..8443b3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -19,13 +19,18 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.QueryState; import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey; import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse; @@ -48,18 +53,29 @@ private static ScheduledQueryExecutionService INSTANCE = null; private ScheduledQueryExecutionContext context; - private ScheduledQueryExecutor worker; private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger(); + private AtomicInteger usedExecutors = new AtomicInteger(0); + private Queue runningExecutors = new ConcurrentLinkedQueue<>(); public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) { HiveConf conf = new HiveConf(inputConf); MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); - ExecutorService executor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); + ExecutorService executor = buildExecutor(conf); ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); return startScheduledQueryExecutorService(ctx); } + private static ExecutorService buildExecutor(HiveConf conf) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build(); + int systemThreads = 2; // poller,reporter + int minServiceThreads = 1; // always keep 1 thread to be used for executing scheduled queries + int maxServiceThreads = conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS); + return new ThreadPoolExecutor(systemThreads + minServiceThreads, systemThreads + maxServiceThreads, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) { synchronized (ScheduledQueryExecutionService.class) { if (INSTANCE != null) { @@ -73,7 +89,7 @@ public static ScheduledQueryExecutionService startScheduledQueryExecutorService( private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) { context = ctx; - ctx.executor.submit(worker = new ScheduledQueryExecutor()); + ctx.executor.submit(new ScheduledQueryPoller()); ctx.executor.submit(new ProgressReporter()); } @@ -81,23 +97,52 @@ static boolean isTerminalState(QueryState state) { return state == QueryState.FINISHED || state == QueryState.FAILED; } - class ScheduledQueryExecutor implements Runnable { + /** + * Renames the {@link Thread} to make it more clear what it is working on. + */ + static class NamedThread implements Closeable { + private final String oldName; - private ScheduledQueryProgressInfo info; + public NamedThread(String newName) { + LOG.info("Starting {} thread - renaming accordingly.", newName); + oldName = Thread.currentThread().getName(); + Thread.currentThread().setName(newName); + } + + @Override + public void close() { + LOG.info("Thread finished; renaming back to: {}", oldName); + Thread.currentThread().setName(oldName); + } + } + + /** + * The poller is responsible for checking for available scheduled queries. + * + * It also handles forced wakeup calls to reduce the impact that the default check period might be minutes. + * There might be only 1 running poller service at a time in a hiveserver instance. + */ + class ScheduledQueryPoller implements Runnable { @Override public void run() { - while (true) { - ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll(); - if (q.isSetExecutionId()) { - try{ - processQuery(q); - } catch (Throwable t) { - LOG.error("Unexpected exception during scheduled query processing", t); + try (NamedThread namedThread = new NamedThread("Scheduled Query Poller")) { + while (!context.executor.isShutdown()) { + int origResets = forcedScheduleCheckCounter.get(); + if (usedExecutors.get() < context.getNumberOfExecutors()) { + try { + ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll(); + if (q.isSetExecutionId()) { + context.executor.submit(new ScheduledQueryExecutor(q)); + // skip sleep and poll again if there are available executor + continue; + } + } catch (Throwable t) { + LOG.error("Unexpected exception during scheduled query submission", t); + } } - } else { try { - sleep(context.getIdleSleepTime()); + sleep(context.getIdleSleepTime(), origResets); } catch (InterruptedException e) { LOG.warn("interrupt discarded"); } @@ -105,9 +150,8 @@ public void run() { } } - private void sleep(long idleSleepTime) throws InterruptedException { + private void sleep(long idleSleepTime, int origResets) throws InterruptedException { long checkIntrvalMs = 1000; - int origResets = forcedScheduleCheckCounter.get(); for (long i = 0; i < idleSleepTime; i += checkIntrvalMs) { Thread.sleep(checkIntrvalMs); if (forcedScheduleCheckCounter.get() != origResets) { @@ -116,6 +160,47 @@ private void sleep(long idleSleepTime) throws InterruptedException { } } + } + + private void executorStarted(ScheduledQueryExecutor executor) { + runningExecutors.add(executor); + usedExecutors.incrementAndGet(); + } + + private void executorStopped(ScheduledQueryExecutor executor) { + usedExecutors.decrementAndGet(); + runningExecutors.remove(executor); + forceScheduleCheck(); + } + + /** + * Responsible for a single execution of a scheduled query. + * + * The execution happens in a separate thread. + */ + class ScheduledQueryExecutor implements Runnable { + + private ScheduledQueryProgressInfo info; + private final ScheduledQueryPollResponse pollResponse; + + public ScheduledQueryExecutor(ScheduledQueryPollResponse pollResponse) { + this.pollResponse = pollResponse; + executorStarted(this); + } + + public void run() { + try (NamedThread namedThread = new NamedThread(getThreadName())) { + processQuery(pollResponse); + } finally { + executorStopped(this); + } + } + + private String getThreadName() { + return String.format("Scheduled Query Executor(schedule:%s, execution_id:%d)", + pollResponse.getScheduleKey().getScheduleName(), pollResponse.getExecutionId()); + } + public synchronized void reportQueryProgress() { if (info != null) { LOG.info("Reporting query progress of {} as {} err:{}", info.getScheduledExecutionId(), info.getState(), @@ -128,10 +213,12 @@ public synchronized void reportQueryProgress() { } private void processQuery(ScheduledQueryPollResponse q) { - SessionState state = null; + LOG.info("Executing schq:{}, executionId: {}", q.getScheduleKey().getScheduleName(), q.getExecutionId()); info = new ScheduledQueryProgressInfo(); - info.setScheduledExecutionId(q.getExecutionId()); + info.setScheduledExecutionId(pollResponse.getExecutionId()); info.setState(QueryState.EXECUTING); + info.setExecutorQueryId(buildExecutorQueryId("")); + SessionState state = null; try { HiveConf conf = new HiveConf(context.conf); conf.set(Constants.HIVE_QUERY_EXCLUSIVE_LOCK, lockNameFor(q.getScheduleKey())); @@ -162,7 +249,11 @@ private void processQuery(ScheduledQueryPollResponse q) { } private String buildExecutorQueryId(IDriver driver) { - return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId()); + return buildExecutorQueryId(driver.getQueryState().getQueryId()); + } + + private String buildExecutorQueryId(String queryId) { + return String.format("%s/%s", context.executorHostName, queryId); } private String lockNameFor(ScheduledQueryKey scheduleKey) { @@ -179,43 +270,56 @@ private String getErrorStringForException(Throwable t) { } } + /** + * Reports progress periodically. + * + * To retain the running state of all the in-flight scheduled query executions; + * this class initiates a reporting round periodically. + */ class ProgressReporter implements Runnable { @Override public void run() { - while (true) { - try { - Thread.sleep(context.getProgressReporterSleepTime()); - } catch (InterruptedException e) { - LOG.warn("interrupt discarded"); - } - try { - worker.reportQueryProgress(); - } catch (Exception e) { - LOG.error("ProgressReporter encountered exception ", e); + try (NamedThread namedThread = new NamedThread("Scheduled Query Progress Reporter")) { + while (!context.executor.isShutdown()) { + try { + Thread.sleep(context.getProgressReporterSleepTime()); + } catch (InterruptedException e) { + LOG.warn("interrupt discarded"); + } + try { + for (ScheduledQueryExecutor worker : runningExecutors) { + worker.reportQueryProgress(); + } + } catch (Exception e) { + LOG.error("ProgressReporter encountered exception ", e); + } } } } } - @VisibleForTesting @Override public void close() throws IOException { synchronized (ScheduledQueryExecutionService.class) { if (INSTANCE == null || INSTANCE != this) { throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid"); } - INSTANCE = null; context.executor.shutdown(); + forceScheduleCheck(); try { context.executor.awaitTermination(1, TimeUnit.SECONDS); context.executor.shutdownNow(); + INSTANCE = null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } + /** + * Forces the poller thread to re-check schedules before the normal timeout happens. + */ public static void forceScheduleCheck() { INSTANCE.forcedScheduleCheckCounter.incrementAndGet(); } diff --git ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java index a8fe0c3..dd8da34 100644 --- ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java +++ ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java @@ -113,7 +113,6 @@ public MockScheduledQueryService(String string) { @Override public ScheduledQueryPollResponse scheduledQueryPoll() { - ScheduledQueryPollResponse r = new ScheduledQueryPollResponse(); r.setExecutionId(id++); r.setQuery(stmt); @@ -154,8 +153,6 @@ public void testScheduledQueryExecution() throws ParseException, Exception { MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)"); ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) { - - executor.shutdown(); // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough. SessionState.getConsole().logInfo("Waiting for query execution to finish ..."); synchronized (qService.notifier) { @@ -163,7 +160,6 @@ public void testScheduledQueryExecution() throws ParseException, Exception { } SessionState.getConsole().logInfo("Done waiting for query execution!"); } - executor.shutdownNow(); assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true)); assertThat(qService.lastProgressInfo.getExecutorQueryId(), diff --git ql/src/test/queries/clientpositive/schq_analyze.q ql/src/test/queries/clientpositive/schq_analyze.q index 969b47b..3c03360 100644 --- ql/src/test/queries/clientpositive/schq_analyze.q +++ ql/src/test/queries/clientpositive/schq_analyze.q @@ -21,7 +21,7 @@ create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compu alter scheduled query t_analyze execute; -!sleep 3; +!sleep 10; select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3; diff --git ql/src/test/queries/clientpositive/schq_materialized.q ql/src/test/queries/clientpositive/schq_materialized.q index 6baed49..9848f9f 100644 --- ql/src/test/queries/clientpositive/schq_materialized.q +++ ql/src/test/queries/clientpositive/schq_materialized.q @@ -71,7 +71,7 @@ select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.scheduled_queries; alter scheduled query d execute; -!sleep 3; +!sleep 10; -- the scheduled execution will fail - because of missing TXN; but overall it works.. select state,error_message from sys.scheduled_executions; diff --git ql/src/test/results/clientpositive/llap/schq_materialized.q.out ql/src/test/results/clientpositive/llap/schq_materialized.q.out index e904d46..194a1b3 100644 --- ql/src/test/results/clientpositive/llap/schq_materialized.q.out +++ ql/src/test/results/clientpositive/llap/schq_materialized.q.out @@ -277,7 +277,7 @@ POSTHOOK: query: select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.sched POSTHOOK: type: QUERY POSTHOOK: Input: sys@scheduled_queries #### A masked pattern was here #### -d true hive 0 0 * * * ? * hive_admin_user alter materialized view `default`.`mv1` rebuild +d true hive 0 0 * * * ? * hive_admin_user alter materialized view `default`.`mv1` rebuild NULL PREHOOK: query: alter scheduled query d execute PREHOOK: type: ALTER SCHEDULED QUERY POSTHOOK: query: alter scheduled query d execute diff --git ql/src/test/results/clientpositive/llap/sysdb.q.out ql/src/test/results/clientpositive/llap/sysdb.q.out index 38cadf3..8b0be82 100644 --- ql/src/test/results/clientpositive/llap/sysdb.q.out +++ ql/src/test/results/clientpositive/llap/sysdb.q.out @@ -813,6 +813,8 @@ scheduled_executions start_time scheduled_executions start_time scheduled_executions state scheduled_executions state +scheduled_queries active_execution_id +scheduled_queries active_execution_id scheduled_queries cluster_namespace scheduled_queries cluster_namespace scheduled_queries enabled @@ -1069,8 +1071,8 @@ POSTHOOK: Input: sys@columns_v2 a decimal(10,2) 0 acquired_at string 9 action_expression string 4 -add_time int 1 -agent_info string 6 +active_execution_id bigint 8 +active_execution_id bigint 8 PREHOOK: query: select param_key, param_value from database_params order by param_key, param_value limit 5 PREHOOK: type: QUERY PREHOOK: Input: sys@database_params @@ -1419,9 +1421,9 @@ POSTHOOK: Input: sys@table_params #### A masked pattern was here #### COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}} COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}} +COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}} COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}} COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}} -COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}} PREHOOK: query: select tbl_name from tbls order by tbl_name limit 5 PREHOOK: type: QUERY PREHOOK: Input: sys@tbls @@ -1536,9 +1538,9 @@ POSTHOOK: Input: sys@table_stats_view #### A masked pattern was here #### {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}} 0 0 0 0 {"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}} 0 0 0 0 +{"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}} 0 0 0 0 {"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}} 0 0 0 0 {"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}} 0 0 0 0 -{"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}} 0 0 0 0 PREHOOK: query: select COLUMN_STATS_ACCURATE, NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE FROM PARTITION_STATS_VIEW where COLUMN_STATS_ACCURATE is not null order by NUM_FILES, NUM_ROWS, RAW_DATA_SIZE limit 5 PREHOOK: type: QUERY PREHOOK: Input: sys@partition_params diff --git ql/src/test/results/clientpositive/llap/sysdb_schq.q.out ql/src/test/results/clientpositive/llap/sysdb_schq.q.out index 8745e3b..3239c36 100644 --- ql/src/test/results/clientpositive/llap/sysdb_schq.q.out +++ ql/src/test/results/clientpositive/llap/sysdb_schq.q.out @@ -27,6 +27,7 @@ schedule string from deserializer user string from deserializer query string from deserializer next_execution bigint from deserializer +active_execution_id bigint from deserializer # Detailed Table Information Database: sys @@ -35,7 +36,7 @@ Retention: 0 #### A masked pattern was here #### Table Type: EXTERNAL_TABLE Table Parameters: - COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}} + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"active_execution_id\":\"true\",\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}} EXTERNAL TRUE bucketing_version 2 hive.sql.database.type METASTORE @@ -47,7 +48,8 @@ Table Parameters: \"SCHEDULE\", \"USER\", \"QUERY\", - \"NEXT_EXECUTION\" + \"NEXT_EXECUTION\", + \"ACTIVE_EXECUTION_ID\" FROM \"SCHEDULED_QUERIES\" numFiles 0 diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index d16abdb..b540073 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -68,6 +68,8 @@ SCHEMA_NAME("avro.schema.name"), SCHEMA_DOC("avro.schema.doc"), AVRO_SERDE_SCHEMA("avro.serde.schema"), + AVRO_SERDE_TYPE("avro.serde.type"), + AVRO_SERDE_SKIP_BYTES("avro.serde.skip.bytes"), SCHEMA_RETRIEVER("avro.schema.retriever"); private final String propName; diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java index e5d21b0..d56bc2a 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java @@ -19,7 +19,6 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; - import java.util.concurrent.TimeUnit; /** @@ -35,4 +34,13 @@ * @return frequency */ long runFrequency(TimeUnit unit); + + /** + * Gets the initial delay before the first execution. + * + * Defaults to {@link #runFrequency(TimeUnit)} + */ + default long initialDelay(TimeUnit unit) { + return runFrequency(unit); + } } diff --git standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java deleted file mode 100644 index 2eb51c8..0000000 --- standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package consisting the utility methods for metastore. - */ -package org.apache.hadoop.hive.metastore.utils; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 1a5944d..8a826d2 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -12667,7 +12667,7 @@ public ScheduledQueryPollResponse scheduledQueryPoll(ScheduledQueryPollRequest r try { openTransaction(); Query q = pm.newQuery(MScheduledQuery.class, - "nextExecution <= now && enabled && clusterNamespace == ns"); + "nextExecution <= now && enabled && clusterNamespace == ns && activeExecution == null"); q.setSerializeRead(true); q.declareParameters("java.lang.Integer now, java.lang.String ns"); q.setOrdering("nextExecution"); @@ -12685,6 +12685,7 @@ public ScheduledQueryPollResponse scheduledQueryPoll(ScheduledQueryPollRequest r execution.setState(QueryState.INITED); execution.setStartTime(now); execution.setLastUpdateTime(now); + schq.setActiveExecution(execution); pm.makePersistent(execution); pm.makePersistent(schq); ObjectStoreTestHook.onScheduledQueryPoll(); @@ -12735,6 +12736,7 @@ public void scheduledQueryProgress(ScheduledQueryProgressInfo info) throws Inval case TIMED_OUT: execution.setEndTime((int) (System.currentTimeMillis() / 1000)); execution.setLastUpdateTime(null); + execution.getScheduledQuery().setActiveExecution(null); break; default: throw new InvalidOperationException("invalid state: " + info.getState()); @@ -12967,6 +12969,8 @@ public int markScheduledExecutionsTimedOut(int timeoutSecs) throws InvalidOperat // info.set scheduledQueryProgress(info); } + + recoverInvalidScheduledQueryState(timeoutSecs); committed = commitTransaction(); return results.size(); } finally { @@ -12975,4 +12979,21 @@ public int markScheduledExecutionsTimedOut(int timeoutSecs) throws InvalidOperat } } } + + private void recoverInvalidScheduledQueryState(int timeoutSecs) { + int maxLastUpdateTime = (int) (System.currentTimeMillis() / 1000) - timeoutSecs; + Query q = pm.newQuery(MScheduledQuery.class); + q.setFilter("activeExecution != null"); + + List results = (List) q.execute(); + for (MScheduledQuery e : results) { + Integer lastUpdateTime = e.getActiveExecution().getLastUpdateTime(); + if (lastUpdateTime == null || lastUpdateTime < maxLastUpdateTime) { + LOG.error("Scheduled query: {} stuck with an activeExecution - clearing", + scheduledQueryKeyRef(e.getScheduleKey())); + e.setActiveExecution(null); + pm.makePersistent(e); + } + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java index d678d01..4c1b34d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java @@ -35,6 +35,13 @@ private Configuration conf; @Override + public long initialDelay(TimeUnit unit) { + // no delay before the first execution; + // after an ungracefull shutdown it might take time to notice that in-flight scheduled queries are not running anymore + return 0; + } + + @Override public long runFrequency(TimeUnit unit) { return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.SCHEDULED_QUERIES_EXECUTION_MAINT_TASK_FREQUENCY, unit); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java index d055e7d..c80241b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java @@ -35,6 +35,7 @@ private String user; private String query; private Integer nextExecution; + private MScheduledExecution activeExecution; private Set executions; public MScheduledQuery(ScheduledQuery s) { @@ -112,4 +113,12 @@ public String getUser() { return user; } + public void setActiveExecution(MScheduledExecution execution) { + activeExecution = execution; + } + + public MScheduledExecution getActiveExecution() { + return activeExecution; + } + } diff --git standalone-metastore/metastore-server/src/main/resources/package.jdo standalone-metastore/metastore-server/src/main/resources/package.jdo index ce919e4..88eabfa 100644 --- standalone-metastore/metastore-server/src/main/resources/package.jdo +++ standalone-metastore/metastore-server/src/main/resources/package.jdo @@ -1471,6 +1471,10 @@ + + + + diff --git standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 29b20e4..48ad676 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -749,7 +749,8 @@ CREATE TABLE "APP"."SCHEDULED_QUERIES" ( "USER" varchar(128) not null, "SCHEDULE" varchar(256) not null, "QUERY" varchar(4000) not null, - "NEXT_EXECUTION" integer not null + "NEXT_EXECUTION" integer not null, + "ACTIVE_EXECUTION_ID" bigint ); CREATE INDEX NEXTEXECUTIONINDEX ON APP.SCHEDULED_QUERIES (ENABLED,CLUSTER_NAMESPACE,NEXT_EXECUTION); diff --git standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index e8fe6a2..7a230bd 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -58,6 +58,8 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY -- HIVE-21487 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); +-- HIVE-22872 +ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; \ No newline at end of file diff --git standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 955a94b..a2cf981 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1293,6 +1293,35 @@ ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); +CREATE TABLE "SCHEDULED_QUERIES" ( + "SCHEDULED_QUERY_ID" bigint NOT NULL, + "CLUSTER_NAMESPACE" VARCHAR(256), + "ENABLED" bit NOT NULL DEFAULT 0, + "NEXT_EXECUTION" INTEGER, + "QUERY" VARCHAR(4000), + "SCHEDULE" VARCHAR(256), + "SCHEDULE_NAME" VARCHAR(256), + "USER" VARCHAR(256), + "ACTIVE_EXECUTION_ID" bigint, + CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID") +); + +CREATE TABLE "SCHEDULED_EXECUTIONS" ( + "SCHEDULED_EXECUTION_ID" bigint NOT NULL, + "END_TIME" INTEGER, + "ERROR_MESSAGE" VARCHAR(2000), + "EXECUTOR_QUERY_ID" VARCHAR(256), + "LAST_UPDATE_TIME" INTEGER, + "SCHEDULED_QUERY_ID" bigint, + "START_TIME" INTEGER, + "STATE" VARCHAR(256), + CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"), + CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE +); + +CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME"); +CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID"); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index a554f8a..4a58770 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -33,6 +33,36 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB -- HIVE-21487 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); +CREATE TABLE "SCHEDULED_QUERIES" ( + "SCHEDULED_QUERY_ID" bigint NOT NULL, + "CLUSTER_NAMESPACE" VARCHAR(256), + "ENABLED" bit NOT NULL DEFAULT 0, + "NEXT_EXECUTION" INTEGER, + "QUERY" VARCHAR(4000), + "SCHEDULE" VARCHAR(256), + "SCHEDULE_NAME" VARCHAR(256), + "USER" VARCHAR(256), + "ACTIVE_EXECUTION_ID" bigint, + CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID") +); + +CREATE TABLE "SCHEDULED_EXECUTIONS" ( + "SCHEDULED_EXECUTION_ID" bigint NOT NULL, + "END_TIME" INTEGER, + "ERROR_MESSAGE" VARCHAR(2000), + "EXECUTOR_QUERY_ID" VARCHAR(256), + "LAST_UPDATE_TIME" INTEGER, + "SCHEDULED_QUERY_ID" bigint, + "START_TIME" INTEGER, + "STATE" VARCHAR(256), + CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"), + CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE +); + +CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME"); +CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID"); + + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 63c97e6..bc34b51 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1224,6 +1224,7 @@ CREATE TABLE SCHEDULED_QUERIES ( SCHEDULE VARCHAR(256), SCHEDULE_NAME VARCHAR(256), `USER` VARCHAR(256), + ACTIVE_EXECUTION_ID INTEGER, CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY (SCHEDULED_QUERY_ID) ); diff --git standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index c175d4c..13f03bc 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -62,6 +62,8 @@ ALTER TABLE `KEY_CONSTRAINTS` ADD CONSTRAINT `CONSTRAINTS_PK` PRIMARY KEY (`PARE -- HIVE-21487 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); +-- HIVE-22872 +ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ; -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index 4338d9c..8482b59 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1199,6 +1199,7 @@ CREATE TABLE "SCHEDULED_QUERIES" ( "SCHEDULE" VARCHAR(256), "SCHEDULE_NAME" VARCHAR(256), "USER" VARCHAR(256), + "ACTIVE_EXECUTION_ID" number(19), CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID") ); diff --git standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index cec7f53..cbfdd86 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -62,6 +62,8 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB -- HIVE-21487 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); +-- HIVE-22872 +ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19); -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index e78fff1..aa35a7a 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1887,6 +1887,7 @@ CREATE TABLE "SCHEDULED_QUERIES" ( "SCHEDULE" VARCHAR(256), "SCHEDULE_NAME" VARCHAR(256), "USER" VARCHAR(256), + "ACTIVE_EXECUTION_ID" BIGINT, CONSTRAINT "SCHEDULED_QUERIES_PK" PRIMARY KEY ("SCHEDULED_QUERY_ID") ); diff --git standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index 52953f0..9462328 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -193,6 +193,8 @@ ALTER TABLE "KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("PARE -- HIVE-21487 CREATE INDEX "COMPLETED_COMPACTIONS_RES" ON "COMPLETED_COMPACTIONS" ("CC_DATABASE","CC_TABLE","CC_PARTITION"); +-- HIVE-22872 +ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;