diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4c3ef3e..e2ddfe8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1891,6 +1891,21 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), // For Druid storage handler + HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS("hive.druid.overlord.address.default", "localhost:8081", + "Default address of the Druid overlord"), + HIVE_DRUID_INDEXING_INITIAL_SLEEP_TIME("hive.druid.indexing.initial.sleeptime", 10000L, + "Initial time (ms) that we wait till we send the first request to Druid for the status of the\n" + + "indexing task. This value needs to be specified because it takes time till the indexing task\n" + + "is registered by Druid; otherwise, we might end up considering that the task has failed when,\n" + + "in fact, it has not even started"), + HIVE_DRUID_INDEXING_TIMEOUT("hive.druid.indexing.timeout", -1L, + "Maximum time (ms) that we allow an indexing task in Druid to run. After this timeout is passed,\n" + + "the indexing task is shutdown even if it has not finished, and table creation in Druid fails.\n" + + "Default value is -1 (no timeout)"), + HIVE_DRUID_INDEXING_SLEEP_TIME("hive.druid.indexing.sleeptime", 4000L, + "Time (ms) to sleep between each Druid request to check the status of an indexing task"), + HIVE_DRUID_OUTPUT_FORMAT("hive.druid.output.fileformat", "TextFile", new StringSet("TextFile", "ORC"), + "Format to use for storing CTAS or MVs results before indexing them to Druid"), HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082", "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" + "declared"), diff --git druid-handler/pom.xml druid-handler/pom.xml index 0db542e..22c85bf 100644 --- druid-handler/pom.xml +++ druid-handler/pom.xml @@ -102,6 +102,32 @@ + + io.druid + druid-indexing-service + ${druid.version} + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + + diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index ac03099..643e639 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,20 +17,64 @@ */ package org.apache.hadoop.hive.druid; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.aux.DruidHadoopTuningConfig; import org.apache.hadoop.hive.druid.serde.DruidSerDe; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexing.common.task.HadoopIndexTask; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.column.Column; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import io.druid.segment.indexing.granularity.GranularitySpec; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -46,7 +90,12 @@ @Override public Class getOutputFormatClass() { - return HiveDruidOutputFormat.class; + final String name = HiveConf.getVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_OUTPUT_FORMAT); + if (name.equalsIgnoreCase(IOConstants.ORC)) { + return OrcOutputFormat.class; // ORC + } + return HiveIgnoreKeyTextOutputFormat.class; // Textfile } @Override @@ -62,11 +111,8 @@ public HiveMetaHook getMetaHook() { @Override public void preCreateTable(Table table) throws MetaException { // Do safety checks - if (!MetaStoreUtils.isExternalTable(table)) { - throw new MetaException("Table in Druid needs to be declared as EXTERNAL"); - } - if (!StringUtils.isEmpty(table.getSd().getLocation())) { - throw new MetaException("LOCATION may not be specified for Druid"); + if (MetaStoreUtils.isExternalTable(table) && !StringUtils.isEmpty(table.getSd().getLocation())) { + throw new MetaException("LOCATION may not be specified for Druid existing sources"); } if (table.getPartitionKeysSize() != 0) { throw new MetaException("PARTITIONED BY may not be specified for Druid"); @@ -78,12 +124,196 @@ public void preCreateTable(Table table) throws MetaException { @Override public void rollbackCreateTable(Table table) throws MetaException { - // Nothing to do + // Remove results from Hive + final Warehouse wh = new Warehouse(getConf()); + final String location = table.getSd().getLocation(); + boolean success = false; + if (location != null) { + success = wh.deleteDir(new Path(location), true); + } + if (!success) { + if (LOG.isErrorEnabled()) { + LOG.error("Could not remove Hive results for Druid indexing"); + } + } else { + if (LOG.isErrorEnabled()) { + LOG.error("Rollback due to error; removed Hive results for Druid indexing"); + } + } } @Override public void commitCreateTable(Table table) throws MetaException { - // Nothing to do + if (MetaStoreUtils.isExternalTable(table)) { + // Nothing to do + return; + } + final Warehouse wh = new Warehouse(getConf()); + final String location = table.getSd().getLocation(); + try { + if (StringUtils.isEmpty(location) || wh.isEmpty(new Path(location))) { + if (LOG.isInfoEnabled()) { + LOG.info("No need to move data to Druid"); + } + return; + } + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Take results and format + final String dataSource = table.getParameters().get(Constants.DRUID_DATA_SOURCE); + if (dataSource == null) { + throw new MetaException("Druid data source not specified; use " + + Constants.DRUID_DATA_SOURCE + + " in table properties"); + } + final String name = HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OUTPUT_FORMAT); + + // Create indexing task + final HadoopIndexTask indexTask = createHadoopIndexTask(table, dataSource, location, name); + + // Submit indexing task to Druid and retrieve results + String address = HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS); + if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { + throw new MetaException("Druid overlord address not specified in configuration"); + } + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + InputStream response; + String taskId; + try { + response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createTaskRequest(address, indexTask)); + taskId = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response, JsonNode.class) + .get("task").textValue(); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + if (taskId == null) { + throw new MetaException("Connected to Druid but task ID is null"); + } + + // Monitor the indexing task + final long initialSleepTime = HiveConf.getLongVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_INDEXING_INITIAL_SLEEP_TIME); + final long timeout = HiveConf.getLongVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_INDEXING_TIMEOUT); + final long sleepTime = HiveConf.getLongVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_INDEXING_SLEEP_TIME); + final LogHelper console = new LogHelper(LOG); + console.printInfo("Indexing data in Druid - TaskId: " + taskId); + JsonNode statusResponse; + try { + statusResponse = DruidStorageHandlerUtils.monitorTask(client, address, taskId, + initialSleepTime, timeout, sleepTime); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Exception in Druid indexing task"); + } + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Provide feedback + if (DruidStorageHandlerUtils.isComplete(statusResponse)) { + // It finished: either it succeeded or failed + if (DruidStorageHandlerUtils.isSuccess(statusResponse)) { + // Success + console.printInfo("Finished indexing data in Druid - TaskId: " + taskId + + ", duration: " + DruidStorageHandlerUtils.extractDurationFromResponse(statusResponse) + "ms"); + } else { + // Fail + throw new MetaException("Error storing data in Druid - TaskId: " + taskId + + ". Check Druid logs for more information"); + } + } else { + // Still running, we hit the timeout, shutdown the task and bail out + if (LOG.isWarnEnabled()) { + LOG.warn("Timeout exceeeded: shutting down Druid indexing task..."); + } + try { + DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createTaskShutdownRequest(address, taskId)); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + throw new MetaException("Timeout exceeeded - TaskId: " + taskId + + ". Data storage operation in Druid not completed"); + } + + // Remove results from Hive + boolean success = wh.deleteDir(new Path(location), true); + if (!success) { + if (LOG.isErrorEnabled()) { + LOG.error("Could not remove Hive results for Druid indexing"); + } + } + } + + private HadoopIndexTask createHadoopIndexTask(Table table, String dataSource, String location, + String name) throws MetaException { + // Create data schema specification (including parser specification) + final Map parserMap; + if (name.equalsIgnoreCase(IOConstants.ORC)) { // ORC + throw new UnsupportedOperationException("Currently reading from ORC is not supported;" + + "Druid version needs to be upgraded to 0.9.2"); + } else { // CSV + // Default + TimestampSpec timestampSpec = new TimestampSpec(Column.TIME_COLUMN_NAME, + "yyyy-MM-dd HH:mm:ss", null); + // Default, all columns that are not metrics or timestamp, are treated as dimensions + DimensionsSpec dimensionsSpec = new DimensionsSpec(null, null, null); + // Add column names for the Spec, so it knows which column corresponds to each name + List columns = new ArrayList<>(); + for (FieldSchema f : table.getSd().getCols()) { + columns.add(f.getName()); + } + ParseSpec parseSpec = new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns); + InputRowParser parser = new StringInputRowParser(parseSpec, null); + try { + parserMap = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(parser), + new TypeReference>(){}); + } catch (Exception e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + // Metrics + final List aggregators = new ArrayList<>(); + for (FieldSchema f : table.getSd().getCols()) { + AggregatorFactory af; + switch (f.getType()) { + case serdeConstants.TINYINT_TYPE_NAME: + case serdeConstants.SMALLINT_TYPE_NAME: + case serdeConstants.INT_TYPE_NAME: + case serdeConstants.BIGINT_TYPE_NAME: + af = new LongSumAggregatorFactory(f.getName(), f.getName()); + break; + case serdeConstants.FLOAT_TYPE_NAME: + case serdeConstants.DOUBLE_TYPE_NAME: + af = new DoubleSumAggregatorFactory(f.getName(), f.getName()); + break; + default: + // Dimension or timestamp + continue; + } + aggregators.add(af); + } + final GranularitySpec granularitySpec = + new ArbitraryGranularitySpec(QueryGranularity.fromString("NONE"), + ImmutableList.of(DruidTable.DEFAULT_INTERVAL)); + final DataSchema dataSchema = new DataSchema(dataSource, parserMap, + aggregators.toArray(new AggregatorFactory[aggregators.size()]), + granularitySpec, DruidStorageHandlerUtils.JSON_MAPPER); + Map pathSpec = new HashMap<>(); + pathSpec.put("type", "static"); + pathSpec.put("paths", location); + if (name.equalsIgnoreCase(IOConstants.ORC)) { + pathSpec.put("inputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"); + } + HadoopIOConfig ioConfig = new HadoopIOConfig(pathSpec, null, null); + HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, ioConfig, + DruidHadoopTuningConfig.makeDefaultTuningConfig(), null); + return new HadoopIndexTask(null, spec, null, null, null, DruidStorageHandlerUtils.JSON_MAPPER, null); } @Override diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index c6b8024..a48f5d1 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -22,15 +22,19 @@ import java.net.URL; import java.util.concurrent.ExecutionException; +import javax.ws.rs.core.MediaType; + import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.indexing.common.task.HadoopIndexTask; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.BaseQuery; @@ -52,6 +56,52 @@ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); /** + * Method that creates a request for Druid JSON task + * @param mapper + * @param address + * @param task + * @return + * @throws IOException + */ + public static Request createTaskRequest(String address, HadoopIndexTask task) + throws IOException { + return new Request(HttpMethod.POST, + new URL(String.format("%s/druid/indexer/v1/task/", "http://" + address))) + .setContent(JSON_MAPPER.writeValueAsBytes(task)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON); + } + + /** + * Method that creates a request for Druid JSON task status + * @param mapper + * @param address + * @param taskID + * @return + * @throws IOException + */ + public static Request createTaskStatusRequest(String address, String taskId) + throws IOException { + return new Request(HttpMethod.GET, + new URL(String.format("%s/druid/indexer/v1/task/%s/status", + "http://" + address, taskId))); + } + + /** + * Method that creates a request for Druid JSON task shutdown + * @param mapper + * @param address + * @param taskID + * @return + * @throws IOException + */ + public static Request createTaskShutdownRequest(String address, String taskId) + throws IOException { + return new Request(HttpMethod.POST, + new URL(String.format("%s/druid/indexer/v1/task/%s/shutdown", + "http://" + address, taskId))); + } + + /** * Method that creates a request for Druid JSON query (using SMILE). * @param mapper * @param address @@ -59,7 +109,7 @@ * @return * @throws IOException */ - public static Request createRequest(String address, BaseQuery query) + public static Request createQueryRequest(String address, BaseQuery query) throws IOException { return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))) .setContent(SMILE_MAPPER.writeValueAsBytes(query)) @@ -87,4 +137,119 @@ public static InputStream submitRequest(HttpClient client, Request request) return response; } + /** + * Method that monitors a task. It will send a status request every sleepTime milliseconds, till + * either the task is completed, or the total time is greater than timeout milliseconds. Returns + * the final status of the task. + * @param client + * @param address + * @param initialStatus + * @param timeout + * @param sleepTime + * @return + * @throws IOException + * @throws InterruptedException + */ + public static JsonNode monitorTask(HttpClient client, String address, String taskId, + long initialSleep, long timeout, long sleepTime) throws IOException, InterruptedException { + // Create request to monitor the status + final Request taskStatusRequest = createTaskStatusRequest(address, taskId); + long startTime = System.currentTimeMillis(); + // Sleep for the initial time to let Druid start the indexing task + // Note that the initial sleep counts towards the total timeout + Thread.sleep(initialSleep); + long endTime = startTime; + JsonNode statusResponse; + do { + InputStream response = submitRequest(client, taskStatusRequest); + statusResponse = JSON_MAPPER.readValue(response, JsonNode.class); + Thread.sleep(sleepTime); + endTime = System.currentTimeMillis(); + } while (isRunnable(statusResponse) && (timeout < 0 || endTime - startTime < timeout)); + return statusResponse; + } + + /** + * Returns whether the JSON response from the Druid coordinator specifies that + * the task is still running. + * @param status JSON response from the Druid coordinator + * @return true if we can infer that the task is still running, false otherwise + */ + protected static boolean isRunnable(JsonNode status) { + String s = extractStatusFromResponse(status); + if (s == null) { + // Null response, consider it is finished + return false; + } + return s.equalsIgnoreCase("RUNNING"); + } + + /** + * Returns whether the JSON response from the Druid coordinator specifies that + * the task has finished. + * @param status JSON response from the Druid coordinator + * @return true if we can infer that the task has been completed, false otherwise + */ + protected static boolean isComplete(JsonNode status) { + String s = extractStatusFromResponse(status); + if (s == null) { + // Null response, consider it is finished + return true; + } + return !s.equalsIgnoreCase("RUNNING"); + } + + /** + * Returns whether the JSON response from the Druid coordinator specifies that + * the task has finished and its execution has been successful. + * @param status JSON response from the Druid coordinator + * @return true if we can infer that the task has succeeded, false otherwise + */ + protected static boolean isSuccess(JsonNode status) { + String s = extractStatusFromResponse(status); + if (s == null) { + // Null response, consider it failed + return false; + } + return s.equalsIgnoreCase("SUCCESS"); + } + + /** + * Returns whether the JSON response from the Druid coordinator specifies that + * the task has finished and its execution has failed. + * @param status JSON response from the Druid coordinator + * @return true if we cannot infer that the task failed or we cannot infer that + * the task has succeeded, false otherwise + */ + protected static boolean isFailure(JsonNode status) { + String s = extractStatusFromResponse(status); + if (s == null) { + // Null response, consider it failed + return true; + } + return s.equalsIgnoreCase("FAILED"); + } + + private static String extractStatusFromResponse(JsonNode status) { + JsonNode taskStatus = status.get("status"); + if (taskStatus != null) { + return taskStatus.get("status").textValue(); + } + return null; + } + + /** + * Extract the duration of the task from the JSON response of the Druid + * coordinator. + * @param status JSON response from the Druid coordinator + * @return the task duration in ms, if available; null otherwise + */ + protected static Long extractDurationFromResponse(JsonNode status) { + JsonNode taskStatus = status.get("status"); + if (taskStatus != null) { + return taskStatus.get("duration").longValue(); + } + return null; + } + } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java deleted file mode 100644 index 45e31d6..0000000 --- druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.druid; - -import java.io.IOException; -import java.util.Properties; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; - -/** - * Place holder for Druid output format. Currently not implemented. - */ -@SuppressWarnings("rawtypes") -public class HiveDruidOutputFormat implements HiveOutputFormat { - - @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, - Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) - throws IOException { - throw new UnsupportedOperationException(); - } - -} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java index 3df1452..ea833d3 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java @@ -187,7 +187,7 @@ private static String createSelectStarQuery(String address, String dataSource) t InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(address, metadataQuery)); + DruidStorageHandlerUtils.createQueryRequest(address, metadataQuery)); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -231,7 +231,7 @@ private static String createSelectStarQuery(String address, String dataSource) t try { response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(address, timeQuery)); + DruidStorageHandlerUtils.createQueryRequest(address, timeQuery)); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/aux/DruidHadoopTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/aux/DruidHadoopTuningConfig.java new file mode 100644 index 0000000..8285a1b --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/aux/DruidHadoopTuningConfig.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.aux; + +import java.util.List; +import java.util.Map; + +import org.joda.time.DateTime; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.ImmutableMap; + +import io.druid.indexer.HadoopTuningConfig; +import io.druid.indexer.HadoopyShardSpec; +import io.druid.indexer.partitions.PartitionsSpec; +import io.druid.segment.IndexSpec; + +/** + * We create our own tuning config class because there is a conflict with + * the variable names [maxRowsInMemory, rowFlushBoundary], and otherwise + * there is a Jackson conflict while creating the JSON + */ +@JsonTypeName("hadoop") +public class DruidHadoopTuningConfig extends HadoopTuningConfig { + + @JsonCreator + private DruidHadoopTuningConfig(final @JsonProperty("workingPath") String workingPath, + final @JsonProperty("version") String version, + final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + final @JsonProperty("shardSpecs") Map> shardSpecs, + final @JsonProperty("indexSpec") IndexSpec indexSpec, + final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, + final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, + final @JsonProperty("overwriteFiles") boolean overwriteFiles, + final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + final @JsonProperty("jobProperties") Map jobProperties, + final @JsonProperty("combineText") boolean combineText, + final @JsonProperty("useCombiner") Boolean useCombiner, + final @JsonProperty("buildV9Directly") Boolean buildV9Directly, + final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads) { + super (workingPath, version, partitionsSpec, shardSpecs, indexSpec, maxRowsInMemory, + leaveIntermediate, cleanupOnFailure, overwriteFiles, ignoreInvalidRows, + jobProperties, combineText, useCombiner, null, buildV9Directly, numBackgroundPersistThreads); + } + + public static DruidHadoopTuningConfig makeDefaultTuningConfig() { + HadoopTuningConfig conf = HadoopTuningConfig.makeDefaultTuningConfig(); + // Important to avoid compatibility problems with Jackson versions + Map jobProperties = + ImmutableMap.of("mapreduce.job.user.classpath.first", "true"); + return new DruidHadoopTuningConfig(conf.getWorkingPath(), conf.getVersion(), + conf.getPartitionsSpec(), conf.getShardSpecs(), conf.getIndexSpec(), + conf.getRowFlushBoundary(), conf.isLeaveIntermediate(), conf.isCleanupOnFailure(), + conf.isOverwriteFiles(), conf.isIgnoreInvalidRows(), jobProperties, + conf.isCombineText(), conf.getUseCombiner(), conf.getBuildV9Directly(), + conf.getNumBackgroundPersistThreads()); + } + +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 96bcee8..892bf92 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -83,7 +83,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); InputStream response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)); + DruidStorageHandlerUtils.createQueryRequest(hiveDruidSplit.getAddress(), query)); // Retrieve results List resultsList; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java index 70b493c..ddd522c 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -76,9 +76,10 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); - value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); if (values.hasNext()) { - value.getValue().putAll(values.next().getEvent()); + final EventHolder event = values.next(); + value.getValue().putAll(event.getEvent()); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, event.getTimestamp().getMillis()); return value; } return value; @@ -89,9 +90,10 @@ public boolean next(NullWritable key, DruidWritable value) throws IOException { if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); if (values.hasNext()) { - value.getValue().putAll(values.next().getEvent()); + final EventHolder event = values.next(); + value.getValue().putAll(event.getEvent()); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, event.getTimestamp().getMillis()); } return true; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 8f53d4a..f61fe30 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -29,8 +29,12 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -42,7 +46,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; @@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.http.client.HttpClient; @@ -81,8 +85,10 @@ private String[] columns; private PrimitiveTypeInfo[] types; private ObjectInspector inspector; + private AbstractSerDe serializer; + @SuppressWarnings("deprecation") @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { final List columnNames = new ArrayList<>(); @@ -92,52 +98,81 @@ public void initialize(Configuration configuration, Properties properties) throw // Druid query String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON); if (druidQuery == null) { - // No query. We need to create a Druid Segment Metadata query that retrieves all - // columns present in the data source (dimensions and metrics). - // Create Segment Metadata Query - String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); - if (dataSource == null) { - throw new SerDeException("Druid data source not specified; use " + - Constants.DRUID_DATA_SOURCE + " in table properties"); - } - SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); - builder.dataSource(dataSource); - builder.merge(true); - builder.analysisTypes(); - SegmentMetadataQuery query = builder.build(); + // No query. Either it is a CTAS, or we need to create a Druid + // Segment Metadata query that retrieves all columns present in + // the data source (dimensions and metrics). + if (!org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS)) + && !org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) { + columnNames.addAll(Utilities.getColumnNames(properties)); + if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { + throw new SerDeException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN + + "') not specified in create table; list of columns is : " + + properties.getProperty(serdeConstants.LIST_COLUMNS)); + } + columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), + new Function() { + @Override + public PrimitiveTypeInfo apply(String type) { + return TypeInfoFactory.getPrimitiveTypeInfo(type); + } + } + )); + inspectors.addAll(Lists.transform(columnTypes, + new Function() { + @Override + public ObjectInspector apply(PrimitiveTypeInfo type) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type); + } + } + )); + columns = columnNames.toArray(new String[columnNames.size()]); + types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + } else { + String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); + if (dataSource == null) { + throw new SerDeException("Druid data source not specified; use " + + Constants.DRUID_DATA_SOURCE + " in table properties"); + } + SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); + builder.dataSource(dataSource); + builder.merge(true); + builder.analysisTypes(); + SegmentMetadataQuery query = builder.build(); - // Execute query in Druid - String address = HiveConf.getVar(configuration, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); - if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { - throw new SerDeException("Druid broker address not specified in configuration"); - } + // Execute query in Druid + String address = HiveConf.getVar(configuration, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { + throw new SerDeException("Druid broker address not specified in configuration"); + } - // Infer schema - SegmentAnalysis schemaInfo; - try { - schemaInfo = submitMetadataRequest(address, query); - } catch (IOException e) { - throw new SerDeException(e); - } - for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { - if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { - // Special handling for timestamp column + // Infer schema + SegmentAnalysis schemaInfo; + try { + schemaInfo = submitMetadataRequest(address, query); + } catch (IOException e) { + throw new SerDeException(e); + } + for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { + if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { + // Special handling for timestamp column + columnNames.add(columnInfo.getKey()); // field name + PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type + columnTypes.add(type); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + continue; + } columnNames.add(columnInfo.getKey()); // field name - PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type + PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( + columnInfo.getValue().getType()); // field type columnTypes.add(type); inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); - continue; } - columnNames.add(columnInfo.getKey()); // field name - PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( - columnInfo.getValue().getType()); // field type - columnTypes.add(type); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + columns = columnNames.toArray(new String[columnNames.size()]); + types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } - columns = columnNames.toArray(new String[columnNames.size()]); - types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); - inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } else { // Query is specified, we can extract the results schema from the query Query query; @@ -179,6 +214,19 @@ public void initialize(Configuration configuration, Properties properties) throw + "\t columns: " + columnNames + "\n\t types: " + columnTypes); } + + // Initialize serializer + final String name = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_OUTPUT_FORMAT); + if (name.equalsIgnoreCase(IOConstants.ORC)) { + throw new UnsupportedOperationException("Currently reading from ORC is not supported;" + + "Druid version needs to be upgraded to 0.9.2"); + } else { + serializer = new MetadataTypedColumnsetSerDe(); // Textfile + properties.put(serdeConstants.SERIALIZATION_FORMAT, ","); + properties.put(serdeConstants.SERIALIZATION_NULL_FORMAT, ""); + properties.put(serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST, "false"); + } + serializer.initialize(configuration, properties); } /* Submits the request and returns */ @@ -188,7 +236,7 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(address, query)); + DruidStorageHandlerUtils.createQueryRequest(address, query)); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); } @@ -292,17 +340,18 @@ private void inferSchema(GroupByQuery query, List columnNames, List getSerializedClass() { - return NullWritable.class; + return serializer.getSerializedClass(); } @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - return NullWritable.get(); + return serializer.serialize(o, objectInspector); } @Override public SerDeStats getSerDeStats() { - throw new UnsupportedOperationException("SerdeStats not supported."); + // no support for statistics + return null; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index eeba6cd..b8d40ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -266,7 +266,7 @@ public Stat getStat() { protected transient FileSystem fs; protected transient Serializer serializer; protected final transient LongWritable row_count = new LongWritable(); - private transient boolean isNativeTable = true; + private transient boolean needsNativeStorage = true; /** * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets, @@ -335,7 +335,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { try { this.hconf = hconf; filesCreated = false; - isNativeTable = !conf.getTableInfo().isNonNative(); + needsNativeStorage = conf.getTableInfo().needsNativeStorage(); isTemporary = conf.isTemporary(); multiFileSpray = conf.isMultiFileSpray(); totalFiles = conf.getTotalFiles(); @@ -557,7 +557,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { assert filesIdx == numFiles; // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable && fs != null && fsp != null) { + if (needsNativeStorage && fs != null && fsp != null) { autoDelete = fs.deleteOnExit(fsp.outPaths[0]); } } catch (Exception e) { @@ -571,7 +571,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - if (isNativeTable) { + if (needsNativeStorage) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); if (isInfoEnabled) { LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); @@ -598,7 +598,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); } - if (isNativeTable) { + if (needsNativeStorage) { // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } @@ -1070,7 +1070,7 @@ public void closeOp(boolean abort) throws HiveException { } } - if (isNativeTable) { + if (needsNativeStorage) { fsp.commit(fs); } } @@ -1083,7 +1083,7 @@ public void closeOp(boolean abort) throws HiveException { // Hadoop always call close() even if an Exception was thrown in map() or // reduce(). for (FSPaths fsp : valToPaths.values()) { - fsp.abortWriters(fs, abort, !autoDelete && isNativeTable); + fsp.abortWriters(fs, abort, !autoDelete && needsNativeStorage); } } fsp = prevFsp = null; @@ -1106,7 +1106,7 @@ static public String getOperatorName() { public void jobCloseOp(Configuration hconf, boolean success) throws HiveException { try { - if ((conf != null) && isNativeTable) { + if ((conf != null) && needsNativeStorage) { Path specPath = conf.getDirName(); DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); if (conf.isLinkedFileSink() && (dpCtx != null)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fd64056..84bc662 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3047,8 +3047,8 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map // The input file does not exist, replace it by a empty file PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); - if (partDesc.getTableDesc().isNonNative()) { - // if this isn't a hive table we can't create an empty file for it. + if (!partDesc.getTableDesc().needsNativeStorage()) { + // if it does not need native storage, we can't create an empty file for it. return path; } @@ -3083,8 +3083,8 @@ private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, throws Exception { TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); - if (tableDesc.isNonNative()) { - // if this isn't a hive table we can't create an empty file for it. + if (!tableDesc.needsNativeStorage()) { + // if it does not need native storage, we can't create an empty file for it. return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 747f387..43f5f3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11647,10 +11647,6 @@ ASTNode analyzeCreateTable( storageFormat.fillDefaultStorageFormat(isExt, false); - if ((command_type == CTAS) && (storageFormat.getStorageHandler() != null)) { - throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg()); - } - // check for existence of table if (ifNotExists) { try { @@ -11791,7 +11787,7 @@ ASTNode analyzeCreateTable( rowFormatParams.lineDelim, comment, storageFormat.getInputFormat(), storageFormat.getOutputFormat(), location, storageFormat.getSerde(), storageFormat.getStorageHandler(), storageFormat.getSerdeProps(), tblProps, ifNotExists, - skewedColNames, skewedValues, true, primaryKeys, foreignKeys); + skewedColNames, skewedValues, true, primaryKeys, foreignKeys); tableDesc.setMaterialization(isMaterialization); tableDesc.setStoredAsSubDirectories(storedAsDirs); tableDesc.setNullFormat(rowFormatParams.nullFormat); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index b15ad34..408ff47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; @@ -302,21 +303,31 @@ public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTy /** * Generate a table descriptor from a createTableDesc. + * @throws HiveException */ public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols, String colTypes) { - Class serdeClass = LazySimpleSerDe.class; - String separatorCode = Integer.toString(Utilities.ctrlaCode); - String columns = cols; - String columnTypes = colTypes; - boolean lastColumnTakesRestOfTheLine = false; TableDesc ret; + // Resolve storage handler (if any) try { - if (crtTblDesc.getSerName() != null) { - Class c = JavaUtils.loadClass(crtTblDesc.getSerName()); - serdeClass = c; + HiveStorageHandler storageHandler = null; + if (crtTblDesc.getStorageHandler() != null) { + storageHandler = HiveUtils.getStorageHandler( + SessionState.getSessionConf(), crtTblDesc.getStorageHandler()); + } + + Class serdeClass = LazySimpleSerDe.class; + String separatorCode = Integer.toString(Utilities.ctrlaCode); + String columns = cols; + String columnTypes = colTypes; + boolean lastColumnTakesRestOfTheLine = false; + + if (storageHandler != null) { + serdeClass = storageHandler.getSerDeClass(); + } else if (crtTblDesc.getSerName() != null) { + serdeClass = JavaUtils.loadClass(crtTblDesc.getSerName()); } if (crtTblDesc.getFieldDelim() != null) { @@ -329,6 +340,12 @@ public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols, // set other table properties Properties properties = ret.getProperties(); + if (crtTblDesc.getStorageHandler() != null) { + properties.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, + crtTblDesc.getStorageHandler()); + } + if (crtTblDesc.getCollItemDelim() != null) { properties.setProperty(serdeConstants.COLLECTION_DELIM, crtTblDesc .getCollItemDelim()); @@ -367,15 +384,24 @@ public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols, // replace the default input & output file format with those found in // crtTblDesc - Class c1 = JavaUtils.loadClass(crtTblDesc.getInputFormat()); - Class c2 = JavaUtils.loadClass(crtTblDesc.getOutputFormat()); - Class in_class = c1; - Class out_class = c2; - + Class in_class; + if (storageHandler != null) { + in_class = storageHandler.getInputFormatClass(); + } else { + in_class = JavaUtils.loadClass(crtTblDesc.getInputFormat()); + } + Class out_class; + if (storageHandler != null) { + out_class = storageHandler.getOutputFormatClass(); + } else { + out_class = JavaUtils.loadClass(crtTblDesc.getOutputFormat()); + } ret.setInputFileFormatClass(in_class); ret.setOutputFileFormatClass(out_class); } catch (ClassNotFoundException e) { throw new RuntimeException("Unable to find class in getTableDesc: " + e.getMessage(), e); + } catch (HiveException e) { + throw new RuntimeException("Error loading storage handler in getTableDesc: " + e.getMessage(), e); } return ret; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 1da8e91..fee11cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -25,24 +25,33 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; - import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TableDesc. * */ public class TableDesc implements Serializable, Cloneable { + + private static final Logger LOG = LoggerFactory.getLogger(TableDesc.class); + private static final long serialVersionUID = 1L; private Class inputFileFormatClass; private Class outputFileFormatClass; @@ -165,6 +174,28 @@ public boolean isNonNative() { return (properties.getProperty(hive_metastoreConstants.META_TABLE_STORAGE) != null); } + public boolean needsNativeStorage() { + // TODO: We add this exception for the moment. We should allow storage handlers to + // specify their desired behavior. + String handlerClass = properties.getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE); + if (handlerClass == null) { + // Native + return true; + } + try { + HiveStorageHandler storageHandler = HiveUtils.getStorageHandler( + SessionState.getSessionConf(), handlerClass); + if (storageHandler.toString().equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { + return true; + } + } catch (HiveException e) { + LOG.warn("Could not resolve storage handler"); + } + // Safe bail out + return false; + } + @Override public Object clone() { TableDesc ret = new TableDesc(); diff --git ql/src/test/queries/clientnegative/druid_ctas.q ql/src/test/queries/clientnegative/druid_ctas.q new file mode 100644 index 0000000..af377b2 --- /dev/null +++ ql/src/test/queries/clientnegative/druid_ctas.q @@ -0,0 +1,9 @@ +set hive.druid.broker.address.default=localhost.test; + +-- no timestamp +EXPLAIN +CREATE TABLE druid_table_1 +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler' +TBLPROPERTIES ("druid.datasource" = "wikipedia") +AS +SELECT key, value FROM src; diff --git ql/src/test/queries/clientnegative/druid_external.q ql/src/test/queries/clientnegative/druid_external.q deleted file mode 100644 index 2de04db..0000000 --- ql/src/test/queries/clientnegative/druid_external.q +++ /dev/null @@ -1,5 +0,0 @@ -set hive.druid.broker.address.default=localhost.test; - -CREATE TABLE druid_table_1 -STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler' -TBLPROPERTIES ("druid.datasource" = "wikipedia"); diff --git ql/src/test/results/clientnegative/druid_ctas.q.out ql/src/test/results/clientnegative/druid_ctas.q.out new file mode 100644 index 0000000..e304bd8 --- /dev/null +++ ql/src/test/results/clientnegative/druid_ctas.q.out @@ -0,0 +1 @@ +FAILED: SemanticException org.apache.hadoop.hive.serde2.SerDeException: Timestamp column (' __time') not specified in create table; list of columns is : key,value diff --git ql/src/test/results/clientnegative/druid_external.q.out ql/src/test/results/clientnegative/druid_external.q.out deleted file mode 100644 index e5fac51..0000000 --- ql/src/test/results/clientnegative/druid_external.q.out +++ /dev/null @@ -1,7 +0,0 @@ -PREHOOK: query: CREATE TABLE druid_table_1 -STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler' -TBLPROPERTIES ("druid.datasource" = "wikipedia") -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@druid_table_1 -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Table in Druid needs to be declared as EXTERNAL) diff --git ql/src/test/results/clientnegative/druid_location.q.out ql/src/test/results/clientnegative/druid_location.q.out index 5727e8c..149435a 100644 --- ql/src/test/results/clientnegative/druid_location.q.out +++ ql/src/test/results/clientnegative/druid_location.q.out @@ -6,4 +6,4 @@ PREHOOK: type: CREATETABLE #### A masked pattern was here #### PREHOOK: Output: database:default PREHOOK: Output: default@druid_table_1 -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:LOCATION may not be specified for Druid) +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:LOCATION may not be specified for Druid existing sources) diff --git ql/src/test/results/clientpositive/druid_basic2.q.out ql/src/test/results/clientpositive/druid_basic2.q.out index 3205905..78c4e43 100644 --- ql/src/test/results/clientpositive/druid_basic2.q.out +++ ql/src/test/results/clientpositive/druid_basic2.q.out @@ -270,7 +270,7 @@ STAGE PLANS: Partition base file name: druid_table_1 input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat - output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} EXTERNAL TRUE @@ -295,7 +295,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.druid.QTestDruidSerDe input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat - output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} EXTERNAL TRUE @@ -436,7 +436,7 @@ STAGE PLANS: Partition base file name: druid_table_1 input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat - output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} EXTERNAL TRUE @@ -461,7 +461,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.druid.QTestDruidSerDe input format: org.apache.hadoop.hive.druid.HiveDruidQueryBasedInputFormat - output format: org.apache.hadoop.hive.druid.HiveDruidOutputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} EXTERNAL TRUE