diff --git common/src/java/org/apache/hadoop/hive/common/type/TimestampTZUtil.java common/src/java/org/apache/hadoop/hive/common/type/TimestampTZUtil.java index 13c38560d5..90ffddba0d 100644 --- common/src/java/org/apache/hadoop/hive/common/type/TimestampTZUtil.java +++ common/src/java/org/apache/hadoop/hive/common/type/TimestampTZUtil.java @@ -143,7 +143,7 @@ public static ZoneId parseTimeZone(String timeZoneStr) { return ZoneId.of(timeZoneStr); } catch (DateTimeException e1) { // default - throw new RuntimeException("Invalid time zone displacement value"); + throw new RuntimeException("Invalid time zone displacement value", e1); } } diff --git common/src/java/org/apache/hadoop/hive/conf/Constants.java common/src/java/org/apache/hadoop/hive/conf/Constants.java index 10aaee182f..3f976212e3 100644 --- common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -38,6 +38,8 @@ public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity"; public static final String DRUID_SHARD_KEY_COL_NAME = "__druid_extra_partition_key"; public static final String DRUID_QUERY_JSON = "druid.query.json"; + public static final String DRUID_QUERY_FIELD_NAMES = "druid.fieldNames"; + public static final String DRUID_QUERY_FIELD_TYPES = "druid.fieldTypes"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 2f956b179b..44be795a60 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -35,10 +35,18 @@ import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.query.expression.LikeExprMacro; +import io.druid.query.expression.RegexpExtractExprMacro; +import io.druid.query.expression.TimestampCeilExprMacro; +import io.druid.query.expression.TimestampExtractExprMacro; +import io.druid.query.expression.TimestampFloorExprMacro; +import io.druid.query.expression.TimestampFormatExprMacro; +import io.druid.query.expression.TimestampParseExprMacro; +import io.druid.query.expression.TimestampShiftExprMacro; +import io.druid.query.expression.TrimExprMacro; import io.druid.query.select.SelectQueryConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; -import io.druid.segment.column.ColumnConfig; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.storage.hdfs.HdfsDataSegmentPusher; @@ -100,7 +108,6 @@ import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -145,8 +152,21 @@ static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig InjectableValues.Std injectableValues = new InjectableValues.Std() - .addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) - .addValue(ExprMacroTable.class, ExprMacroTable.nil()); + .addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) + // Expressions macro table used when we deserialize the query from calcite plan + .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList + .of(new LikeExprMacro(), + new RegexpExtractExprMacro(), + new TimestampCeilExprMacro(), + new TimestampExtractExprMacro(), + new TimestampFormatExprMacro(), + new TimestampParseExprMacro(), + new TimestampShiftExprMacro(), + new TimestampFloorExprMacro(), + new TrimExprMacro.BothTrimExprMacro(), + new TrimExprMacro.LeftTrimExprMacro(), + new TrimExprMacro.RightTrimExprMacro() + ))); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule(); @@ -171,12 +191,7 @@ /** * Used by druid to perform IO on indexes */ - public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() { - @Override - public int columnCacheSizeBytes() { - return 0; - } - }); + public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, () -> 0); /** * Used by druid to merge indexes @@ -327,19 +342,12 @@ public static void writeSegmentDescriptor( metadataStorageTablesConfig.getSegmentsTable() )) .fold(Lists.newArrayList(), - new Folder3, Map>() { - @Override - public ArrayList fold(ArrayList druidDataSources, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException { - druidDataSources.add( - MapUtils.getString(stringObjectMap, "datasource") - ); - return druidDataSources; - } - } + (druidDataSources, stringObjectMap, foldController, statementContext) -> { + druidDataSources.add( + MapUtils.getString(stringObjectMap, "datasource") + ); + return druidDataSources; + } ) ); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index 2800f058a0..12b4f9d33b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -132,7 +132,7 @@ public DruidWritable getCurrentValue() throws IOException, InterruptedException // Create new value DruidWritable value = new DruidWritable(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); + value.getValue().put("timestamp", currentRow.getTimestamp().getMillis()); // 2) The dimension columns value.getValue().putAll(currentEvent); return value; @@ -144,7 +144,7 @@ public boolean next(NullWritable key, DruidWritable value) { // Update value value.getValue().clear(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); + value.getValue().put("timestamp", currentRow.getTimestamp().getMillis()); // 2) The dimension columns value.getValue().putAll(currentEvent); return true; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 914954da6f..d5ef5d772a 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -19,15 +19,21 @@ import java.io.IOException; import java.io.InputStream; +import java.sql.Timestamp; import java.time.Instant; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -50,6 +56,7 @@ import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -72,6 +79,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; @@ -85,24 +93,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.druid.query.Druids; import io.druid.query.Druids.SegmentMetadataQueryBuilder; -import io.druid.query.Query; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.groupby.GroupByQuery; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; -import io.druid.query.scan.ScanQuery; -import io.druid.query.select.SelectQuery; -import io.druid.query.timeseries.TimeseriesQuery; -import io.druid.query.topn.TopNQuery; /** * DruidSerDe that is used to deserialize objects from a Druid data source. @@ -115,169 +112,152 @@ private String[] columns; private PrimitiveTypeInfo[] types; private ObjectInspector inspector; + private TimestampLocalTZTypeInfo tsTZTypeInfo; @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { - final List columnNames = new ArrayList<>(); - final List columnTypes = new ArrayList<>(); - List inspectors = new ArrayList<>(); - - final TimestampLocalTZTypeInfo tsTZTypeInfo = new TimestampLocalTZTypeInfo( + tsTZTypeInfo = new TimestampLocalTZTypeInfo( configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname)); - // Druid query - String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON); - if (druidQuery == null) { - // No query. Either it is a CTAS, or we need to create a Druid - // Segment Metadata query that retrieves all columns present in - // the data source (dimensions and metrics). + final String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON, null); + if (druidQuery != null && !druidQuery.isEmpty()) { + initFromDruidQueryPlan(properties, druidQuery); + } else { + // No query. Either it is a CTAS, or we need to create a Druid meta data Query if (!org.apache.commons.lang3.StringUtils .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS)) && !org.apache.commons.lang3.StringUtils .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) { - columnNames.addAll(Utilities.getColumnNames(properties)); - if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + - "') not specified in create table; list of columns is : " + - properties.getProperty(serdeConstants.LIST_COLUMNS)); - } - columnTypes.addAll(Lists.transform( - Lists.transform(Utilities.getColumnTypes(properties), type -> TypeInfoFactory.getPrimitiveTypeInfo(type)), - e -> e instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : e - )); - inspectors.addAll(Lists.transform(columnTypes, - (Function) type -> PrimitiveObjectInspectorFactory - .getPrimitiveWritableObjectInspector(type) - )); - columns = columnNames.toArray(new String[columnNames.size()]); - types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); - inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, inspectors); + // CASE CTAS statement + initFromProperties(properties); } else { - String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); - if (dataSource == null) { - throw new SerDeException("Druid data source not specified; use " + - Constants.DRUID_DATA_SOURCE + " in table properties"); - } - SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); - builder.dataSource(dataSource); - builder.merge(true); - builder.analysisTypes(); - SegmentMetadataQuery query = builder.build(); - - // Execute query in Druid - String address = HiveConf.getVar(configuration, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS - ); - if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { - throw new SerDeException("Druid broker address not specified in configuration"); - } - - // Infer schema - SegmentAnalysis schemaInfo; - try { - schemaInfo = submitMetadataRequest(address, query); - } catch (IOException e) { - throw new SerDeException(e); - } - for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { - if (columnInfo.getKey().equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - // Special handling for timestamp column - columnNames.add(columnInfo.getKey()); // field name - PrimitiveTypeInfo type = tsTZTypeInfo; // field type - columnTypes.add(type); - inspectors - .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); - continue; - } - columnNames.add(columnInfo.getKey()); // field name - PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( - columnInfo.getValue().getType()); // field type - columnTypes.add(type instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : type); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); - } - columns = columnNames.toArray(new String[columnNames.size()]); - types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); - inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, inspectors); + // Segment Metadata query that retrieves all columns present in + // the data source (dimensions and metrics). + initFromMetaDataQuery(configuration, properties); } - } else { - // Query is specified, we can extract the results schema from the query - Query query; - try { - query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns) + + "\n\t types: " + Arrays.toString(types)); + } + } - // Extract column names and types (if present) - ImmutableMap.Builder mapColumnNamesTypes = ImmutableMap.builder(); - if (!org.apache.commons.lang3.StringUtils - .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS)) - && !org.apache.commons.lang3.StringUtils - .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) { - List propColumnNames = Utilities.getColumnNames(properties); - List propColumnTypes = Utilities.getColumnTypes(properties); - for (int i = 0; i < propColumnNames.size(); i++) { - PrimitiveTypeInfo type = TypeInfoFactory.getPrimitiveTypeInfo(propColumnTypes.get(i)); - if (type instanceof TimestampLocalTZTypeInfo) { - type = tsTZTypeInfo; - } - mapColumnNamesTypes.put(propColumnNames.get(i), type); - } - } + private void initFromMetaDataQuery(final Configuration configuration, final Properties properties) + throws SerDeException { + final List columnNames = new ArrayList<>(); + final List columnTypes = new ArrayList<>(); + final List inspectors = new ArrayList<>(); - switch (query.getType()) { - case Query.TIMESERIES: - inferSchema((TimeseriesQuery) query, tsTZTypeInfo, columnNames, columnTypes, - mapColumnNamesTypes.build()); - break; - case Query.TOPN: - inferSchema((TopNQuery) query, tsTZTypeInfo, columnNames, columnTypes, - mapColumnNamesTypes.build()); - break; - case Query.SELECT: - String address = HiveConf.getVar(configuration, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); - if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { - throw new SerDeException("Druid broker address not specified in configuration"); - } - inferSchema((SelectQuery) query, tsTZTypeInfo, columnNames, columnTypes, address, - mapColumnNamesTypes.build()); - break; - case Query.GROUP_BY: - inferSchema((GroupByQuery) query, tsTZTypeInfo, columnNames, columnTypes, - mapColumnNamesTypes.build()); - break; - case Query.SCAN: - String broker = HiveConf.getVar(configuration, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); - if (org.apache.commons.lang3.StringUtils.isEmpty(broker)) { - throw new SerDeException("Druid broker address not specified in configuration"); - } - inferSchema((ScanQuery) query, tsTZTypeInfo, columnNames, columnTypes, broker, - mapColumnNamesTypes.build()); - break; - default: - throw new SerDeException("Not supported Druid query"); - } - } catch (Exception e) { - throw new SerDeException(e); - } + String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); + if (dataSource == null) { + throw new SerDeException("Druid data source not specified; use " + + Constants.DRUID_DATA_SOURCE + " in table properties"); + } + SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); + builder.dataSource(dataSource); + builder.merge(true); + builder.analysisTypes(); + SegmentMetadataQuery query = builder.build(); - columns = new String[columnNames.size()]; - types = new PrimitiveTypeInfo[columnNames.size()]; - for (int i = 0; i < columnTypes.size(); ++i) { - columns[i] = columnNames.get(i); - types[i] = columnTypes.get(i); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i])); + // Execute query in Druid + String address = HiveConf.getVar(configuration, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS + ); + if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { + throw new SerDeException("Druid broker address not specified in configuration"); + } + // Infer schema + SegmentAnalysis schemaInfo; + try { + schemaInfo = submitMetadataRequest(address, query); + } catch (IOException e) { + throw new SerDeException(e); + } + for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { + if (columnInfo.getKey().equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + // Special handling for timestamp column + columnNames.add(columnInfo.getKey()); // field name + PrimitiveTypeInfo type = tsTZTypeInfo; // field type + columnTypes.add(type); + inspectors + .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + continue; } - inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + columnNames.add(columnInfo.getKey()); // field name + PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( + columnInfo.getValue().getType()); // field type + columnTypes.add(type instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : type); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); } + columns = columnNames.toArray(new String[columnNames.size()]); + types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); + inspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columnNames, inspectors); + } - if (LOG.isDebugEnabled()) { - LOG.debug("DruidSerDe initialized with\n" - + "\t columns: " + columnNames - + "\n\t types: " + columnTypes); + private void initFromProperties(final Properties properties) + throws SerDeException { + final List inspectors = new ArrayList<>(); + final List columnNames = new ArrayList<>(); + final List columnTypes = new ArrayList<>(); + + columnNames.addAll(Utilities.getColumnNames(properties)); + if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + "') not specified in create table; list of columns is : " + + properties.getProperty(serdeConstants.LIST_COLUMNS)); + } + columnTypes.addAll(Lists.transform( + Lists.transform(Utilities.getColumnTypes(properties), type -> TypeInfoFactory.getPrimitiveTypeInfo(type)), + e -> e instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : e + )); + inspectors.addAll(Lists.transform(columnTypes, + (Function) type -> PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(type) + )); + columns = columnNames.toArray(new String[columnNames.size()]); + types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); + inspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columnNames, inspectors); + } + + private void initFromDruidQueryPlan(Properties properties, String druidQuery) { + Preconditions.checkNotNull(druidQuery, "Why Druid query is null"); + final List inspectors = new ArrayList<>(); + final List columnNames; + final List columnTypes; + final String fieldNamesProperty = + Preconditions.checkNotNull(properties.getProperty(Constants.DRUID_QUERY_FIELD_NAMES, null)); + final String fieldTypesProperty = + Preconditions.checkNotNull(properties.getProperty(Constants.DRUID_QUERY_FIELD_TYPES, null)); + if (fieldNamesProperty.isEmpty()) { + // this might seem counter intuitive but some queries like query + // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL) LIMIT 1 + // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty columns + // @TODO maybe be we can push the project as virtual column as well but it is not a big deal. + columnNames = Collections.EMPTY_LIST; + columnTypes = Collections.EMPTY_LIST; + } else { + columnNames = + Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList()); + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty).stream() + .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName())) + .map(primitiveTypeInfo -> { + if (primitiveTypeInfo instanceof TimestampLocalTZTypeInfo) { + return tsTZTypeInfo; + } + return primitiveTypeInfo; + }).collect(Collectors.toList()); } + columns = new String[columnNames.size()]; + types = new PrimitiveTypeInfo[columnNames.size()]; + for (int i = 0; i < columnTypes.size(); ++i) { + columns[i] = columnNames.get(i); + types[i] = columnTypes.get(i); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i])); + } + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } /* Submits the request and returns */ @@ -315,178 +295,6 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ return resultsList.get(0); } - /* Timeseries query */ - private void inferSchema(TimeseriesQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, - List columnNames, List columnTypes, - Map mapColumnNamesTypes) { - // Timestamp column - columnNames.add(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - columnTypes.add(timeColumnTypeInfo); - // Aggregator columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - columnNames.add(af.getName()); - PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName()); - if (typeInfo != null) { - // If datasource was created by Hive, we consider Hive type - columnTypes.add(typeInfo); - } else { - columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); - } - } - // Post-aggregator columns - // TODO: Currently Calcite only infers avg for post-aggregate, - // but once we recognize other functions, we will need to infer - // different types for post-aggregation functions - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - columnNames.add(pa.getName()); - columnTypes.add(TypeInfoFactory.floatTypeInfo); - } - } - - /* TopN query */ - private void inferSchema(TopNQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, - List columnNames, List columnTypes, - Map mapColumnNamesTypes) { - // Timestamp column - columnNames.add(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - columnTypes.add(timeColumnTypeInfo); - // Dimension column - columnNames.add(query.getDimensionSpec().getOutputName()); - columnTypes.add(TypeInfoFactory.stringTypeInfo); - // Aggregator columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - columnNames.add(af.getName()); - PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName()); - if (typeInfo != null) { - // If datasource was created by Hive, we consider Hive type - columnTypes.add(typeInfo); - } else { - columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); - } - } - // Post-aggregator columns - // TODO: Currently Calcite only infers avg for post-aggregate, - // but once we recognize other functions, we will need to infer - // different types for post-aggregation functions - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - columnNames.add(pa.getName()); - columnTypes.add(TypeInfoFactory.floatTypeInfo); - } - } - - /* Select query */ - private void inferSchema(SelectQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, - List columnNames, List columnTypes, - String address, Map mapColumnNamesTypes) - throws SerDeException { - // Timestamp column - columnNames.add(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - columnTypes.add(timeColumnTypeInfo); - // Dimension columns - for (DimensionSpec ds : query.getDimensions()) { - columnNames.add(ds.getOutputName()); - columnTypes.add(TypeInfoFactory.stringTypeInfo); - } - // The type for metric columns is not explicit in the query, thus in this case - // we need to emit a metadata query to know their type - SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); - builder.dataSource(query.getDataSource()); - builder.merge(true); - builder.analysisTypes(); - SegmentMetadataQuery metadataQuery = builder.build(); - // Execute query in Druid - SegmentAnalysis schemaInfo; - try { - schemaInfo = submitMetadataRequest(address, metadataQuery); - } catch (IOException e) { - throw new SerDeException(e); - } - if (schemaInfo == null) { - throw new SerDeException("Connected to Druid but could not retrieve datasource information"); - } - for (String metric : query.getMetrics()) { - columnNames.add(metric); - PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(metric); - if (typeInfo != null) { - // If datasource was created by Hive, we consider Hive type - columnTypes.add(typeInfo); - } else { - columnTypes.add(DruidSerDeUtils.convertDruidToHiveType( - schemaInfo.getColumns().get(metric).getType())); - } - } - } - - /* Scan query */ - private void inferSchema(ScanQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, - List columnNames, List columnTypes, - String address, Map mapColumnNamesTypes) - throws SerDeException { - // The type for metric columns is not explicit in the query, thus in this case - // we need to emit a metadata query to know their type - SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); - builder.dataSource(query.getDataSource()); - builder.merge(true); - builder.analysisTypes(); - SegmentMetadataQuery metadataQuery = builder.build(); - // Execute query in Druid - SegmentAnalysis schemaInfo; - try { - schemaInfo = submitMetadataRequest(address, metadataQuery); - } catch (IOException e) { - throw new SerDeException(e); - } - if (schemaInfo == null) { - throw new SerDeException("Connected to Druid but could not retrieve datasource information"); - } - for (String column : query.getColumns()) { - columnNames.add(column); - PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(column); - if (typeInfo != null) { - // If datasource was created by Hive, we consider Hive type - columnTypes.add(typeInfo); - } else { - ColumnAnalysis columnAnalysis = schemaInfo.getColumns().get(column); - // If column is absent from Druid consider it as a dimension with type string. - String type = columnAnalysis == null ? DruidSerDeUtils.STRING_TYPE : columnAnalysis.getType(); - columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(type)); - } - } - } - - /* GroupBy query */ - private void inferSchema(GroupByQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, - List columnNames, List columnTypes, - Map mapColumnNamesTypes) { - // Timestamp column - columnNames.add(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - columnTypes.add(timeColumnTypeInfo); - // Dimension columns - for (DimensionSpec ds : query.getDimensions()) { - columnNames.add(ds.getOutputName()); - columnTypes.add(DruidSerDeUtils.extractTypeFromDimension(ds)); - } - // Aggregator columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - columnNames.add(af.getName()); - PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(af.getName()); - if (typeInfo != null) { - // If datasource was created by Hive, we consider Hive type - columnTypes.add(typeInfo); - } else { - columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); - } - } - // Post-aggregator columns - // TODO: Currently Calcite only infers avg for post-aggregate, - // but once we recognize other functions, we will need to infer - // different types for post-aggregation functions - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - columnNames.add(pa.getName()); - columnTypes.add(TypeInfoFactory.floatTypeInfo); - } - } - @Override public Class getSerializedClass() { return DruidWritable.class; @@ -505,7 +313,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD List fields = soi.getAllStructFieldRefs(); List values = soi.getStructFieldsDataAsList(o); // We deserialize the result - Map value = new HashMap<>(); + final Map value = new HashMap<>(); for (int i = 0; i < columns.length; i++) { if (values.get(i) == null) { // null, we just add it @@ -514,6 +322,11 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD } final Object res; switch (types[i].getPrimitiveCategory()) { + case TIMESTAMP: + res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector()) + .getPrimitiveJavaObject( + values.get(i)).getTime(); + break; case TIMESTAMPLOCALTZ: res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector()) .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli(); @@ -598,8 +411,8 @@ public SerDeStats getSerDeStats() { @Override public Object deserialize(Writable writable) throws SerDeException { - DruidWritable input = (DruidWritable) writable; - List output = Lists.newArrayListWithExpectedSize(columns.length); + final DruidWritable input = (DruidWritable) writable; + final List output = Lists.newArrayListWithExpectedSize(columns.length); for (int i = 0; i < columns.length; i++) { final Object value = input.getValue().get(columns[i]); if (value == null) { @@ -607,6 +420,12 @@ public Object deserialize(Writable writable) throws SerDeException { continue; } switch (types[i].getPrimitiveCategory()) { + case TIMESTAMP: + output.add(new TimestampWritable(Timestamp.valueOf(ZonedDateTime + .ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), + tsTZTypeInfo.timeZone() + ).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toString()))); + break; case TIMESTAMPLOCALTZ: output.add( new TimestampLocalTZWritable( @@ -664,7 +483,7 @@ public Object deserialize(Writable writable) throws SerDeException { } @Override - public ObjectInspector getObjectInspector() throws SerDeException { + public ObjectInspector getObjectInspector() { return inspector; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index 673420b0df..1c34e418cb 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -65,22 +65,4 @@ public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) { } } - /* Extract type from dimension spec. It returns TIMESTAMP if it is a FLOOR, - * INTEGER if it is a EXTRACT, or STRING otherwise. */ - public static PrimitiveTypeInfo extractTypeFromDimension(DimensionSpec ds) { - if (ds instanceof ExtractionDimensionSpec) { - ExtractionDimensionSpec eds = (ExtractionDimensionSpec) ds; - TimeFormatExtractionFn tfe = (TimeFormatExtractionFn) eds.getExtractionFn(); - if (tfe.getFormat() == null || tfe.getFormat().equals(ISO_TIME_FORMAT)) { - // Timestamp (null or default used by FLOOR) - return TypeInfoFactory.timestampLocalTZTypeInfo; - } else { - // EXTRACT from timestamp - return TypeInfoFactory.intTypeInfo; - } - } - // Default - return TypeInfoFactory.stringTypeInfo; - } - } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java index 571deec07a..7f103c84d4 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -64,7 +64,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put("timestamp", current.getTimestamp().getMillis()); value.getValue().putAll(current.getValue().getBaseObject()); return value; } @@ -74,7 +74,7 @@ public boolean next(NullWritable key, DruidWritable value) { if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put("timestamp", current.getTimestamp().getMillis()); value.getValue().putAll(current.getValue().getBaseObject()); return true; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java index 45c22b0b9f..d082e919ca 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java @@ -76,7 +76,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + value.getValue().put("timestamp", current.getTimestamp().getMillis() ); if (values.hasNext()) { @@ -91,7 +91,7 @@ public boolean next(NullWritable key, DruidWritable value) { if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + value.getValue().put("timestamp", current.getTimestamp().getMillis() ); if (values.hasNext()) { diff --git pom.xml pom.xml index 5be30f6696..841d22753f 100644 --- pom.xml +++ pom.xml @@ -121,7 +121,7 @@ 1.10.0 1.7.7 0.8.0.RELEASE - 1.15.0 + 1.16.0 4.2.4 4.1.17 4.1.19 @@ -260,6 +260,19 @@ false + + calcite + calcite repository + https://repository.apache.org/content/repositories/orgapachecalcite-1042 + default + + true + warn + + + false + + diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index cd01094528..3f73fd7fcc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.ImmutableMap; import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.adapter.druid.DruidSchema; import org.apache.calcite.adapter.druid.DruidTable; @@ -385,7 +386,7 @@ private static RelNode createMaterializedViewScan(HiveConf conf, Table viewTable final TableScan scan = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, viewTable.getTableName(), null, false, false); tableRel = DruidQuery.create(cluster, cluster.traitSetOf(BindableConvention.INSTANCE), - optTable, druidTable, ImmutableList.of(scan)); + optTable, druidTable, ImmutableList.of(scan), ImmutableMap.of()); } else { // Build Hive Table Scan Rel RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java index eb1fb96adc..62475e5b18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java @@ -45,7 +45,7 @@ Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND); private HiveFloorDate(String name) { - super(name, SqlKind.FLOOR, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null, + super("FLOOR", SqlKind.FLOOR, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null, OperandTypes.sequence( "'" + SqlKind.FLOOR + "( TO )'\n" + "'" + SqlKind.FLOOR + "(