diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index 00a4b72a34..82c6653dac 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -20,113 +20,60 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.metamx.http.client.HttpClient; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.dimension.ExtractionDimensionSpec; -import io.druid.query.extraction.TimeFormatExtractionFn; import io.druid.query.groupby.GroupByQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; -import org.joda.time.format.ISODateTimeFormat; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT; /** * Record reader for results for Druid GroupByQuery. */ -public class DruidGroupByQueryRecordReader - extends DruidQueryRecordReader { +public class DruidGroupByQueryRecordReader extends DruidQueryRecordReader { private final static TypeReference TYPE_REFERENCE = new TypeReference() { }; private MapBasedRow currentRow; private Map currentEvent; - private List timeExtractionFields = Lists.newArrayList(); - private List intFormattedTimeExtractionFields = Lists.newArrayList(); - - @Override - public void initialize(InputSplit split, Configuration conf) throws IOException { + @Override public void initialize(InputSplit split, Configuration conf) throws IOException { super.initialize(split, conf); - initDimensionTypes(); } - @Override - public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, - ObjectMapper smileMapper, HttpClient httpClient + @Override public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, ObjectMapper smileMapper, + HttpClient httpClient ) throws IOException { super.initialize(split, conf, mapper, smileMapper, httpClient); - initDimensionTypes(); } - @Override - protected JavaType getResultTypeDef() { + @Override protected JavaType getResultTypeDef() { return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } - private void initDimensionTypes() throws IOException { - //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe - List dimensionSpecList = ((GroupByQuery) query).getDimensions(); - List extractionDimensionSpecList = dimensionSpecList.stream() - .filter(dimensionSpecs -> dimensionSpecs instanceof ExtractionDimensionSpec) - .collect(Collectors.toList()); - extractionDimensionSpecList.stream().forEach(dimensionSpec -> { - ExtractionDimensionSpec extractionDimensionSpec = (ExtractionDimensionSpec) dimensionSpec; - if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) { - final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec - .getExtractionFn(); - if (timeFormatExtractionFn == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) { - timeExtractionFields.add(extractionDimensionSpec.getOutputName()); - } else { - intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName()); - } - } - }); - } - - @Override - public boolean nextKeyValue() { + @Override public boolean nextKeyValue() { // Results if (queryResultsIterator.hasNext()) { final Row row = queryResultsIterator.next(); // currently druid supports only MapBasedRow as Jackson SerDe so it should safe to cast without check currentRow = (MapBasedRow) row; - //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe - currentEvent = Maps.transformEntries(currentRow.getEvent(), - (key, value1) -> { - if (timeExtractionFields.contains(key)) { - return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1); - } - if (intFormattedTimeExtractionFields.contains(key)) { - return Integer.valueOf((String) value1); - } - return value1; - } - ); + currentEvent = currentRow.getEvent(); return true; } return false; } - @Override - public NullWritable getCurrentKey() throws IOException, InterruptedException { + @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } - @Override - public DruidWritable getCurrentValue() throws IOException, InterruptedException { + @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); // 1) The timestamp column @@ -138,8 +85,7 @@ public DruidWritable getCurrentValue() throws IOException, InterruptedException return value; } - @Override - public boolean next(NullWritable key, DruidWritable value) { + @Override public boolean next(NullWritable key, DruidWritable value) { if (nextKeyValue()) { // Update value value.getValue().clear(); @@ -154,8 +100,7 @@ public boolean next(NullWritable key, DruidWritable value) { return false; } - @Override - public float getProgress() throws IOException { + @Override public float getProgress() throws IOException { return queryResultsIterator.hasNext() ? 0 : 1; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 5f7657975a..842a9fada6 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; @@ -83,6 +84,7 @@ import java.io.IOException; import java.io.InputStream; +import java.sql.Date; import java.sql.Timestamp; import java.time.Instant; import java.time.ZonedDateTime; @@ -97,11 +99,13 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.TIMESTAMP_FORMAT; +import static org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser; + /** * DruidSerDe that is used to deserialize objects from a Druid data source. */ -@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE }) -public class DruidSerDe extends AbstractSerDe { +@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE }) public class DruidSerDe extends AbstractSerDe { protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class); @@ -110,23 +114,19 @@ private ObjectInspector inspector; private TimestampLocalTZTypeInfo tsTZTypeInfo; - @Override - public void initialize(Configuration configuration, Properties properties) throws SerDeException { + @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { - tsTZTypeInfo = new TimestampLocalTZTypeInfo( - configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname)); + tsTZTypeInfo = new TimestampLocalTZTypeInfo(configuration.get(HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname)); // Druid query final String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON, null); - if (druidQuery != null && !druidQuery.isEmpty()) { + if (druidQuery != null && !druidQuery.isEmpty()) { initFromDruidQueryPlan(properties, druidQuery); } else { // No query. Either it is a CTAS, or we need to create a Druid meta data Query - if (!org.apache.commons.lang3.StringUtils - .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS)) - && !org.apache.commons.lang3.StringUtils - .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) { + if (!org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS)) + && !org.apache.commons.lang3.StringUtils.isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) { // CASE CTAS statement - initFromProperties(properties); + initFromProperties(properties); } else { // Segment Metadata query that retrieves all columns present in // the data source (dimensions and metrics). @@ -134,8 +134,8 @@ public void initialize(Configuration configuration, Properties properties) throw } } if (LOG.isDebugEnabled()) { - LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns) - + "\n\t types: " + Arrays.toString(types)); + LOG.debug("DruidSerDe initialized with\n" + "\t columns: " + Arrays.toString(columns) + "\n\t types: " + Arrays + .toString(types)); } } @@ -147,8 +147,8 @@ private void initFromMetaDataQuery(final Configuration configuration, final Prop String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); if (dataSource == null) { - throw new SerDeException("Druid data source not specified; use " + - Constants.DRUID_DATA_SOURCE + " in table properties"); + throw new SerDeException( + "Druid data source not specified; use " + Constants.DRUID_DATA_SOURCE + " in table properties"); } SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); builder.dataSource(dataSource); @@ -157,9 +157,7 @@ private void initFromMetaDataQuery(final Configuration configuration, final Prop SegmentMetadataQuery query = builder.build(); // Execute query in Druid - String address = HiveConf.getVar(configuration, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS - ); + String address = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { throw new SerDeException("Druid broker address not specified in configuration"); } @@ -176,33 +174,29 @@ private void initFromMetaDataQuery(final Configuration configuration, final Prop columnNames.add(columnInfo.getKey()); // field name PrimitiveTypeInfo type = tsTZTypeInfo; // field type columnTypes.add(type); - inspectors - .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); continue; } columnNames.add(columnInfo.getKey()); // field name - PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( - columnInfo.getValue().getType()); // field type + PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(columnInfo.getValue().getType()); // field type columnTypes.add(type instanceof TimestampLocalTZTypeInfo ? tsTZTypeInfo : type); inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); } columns = columnNames.toArray(new String[columnNames.size()]); types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); - inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, inspectors); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } - private void initFromProperties(final Properties properties) - throws SerDeException { + private void initFromProperties(final Properties properties) throws SerDeException { final List inspectors = new ArrayList<>(); final List columnNames = new ArrayList<>(); final List columnTypes = new ArrayList<>(); columnNames.addAll(Utilities.getColumnNames(properties)); if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + - "') not specified in create table; list of columns is : " + - properties.getProperty(serdeConstants.LIST_COLUMNS)); + throw new SerDeException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + "') not specified in create table; list of columns is : " + properties + .getProperty(serdeConstants.LIST_COLUMNS)); } columnTypes.addAll(Lists.transform( Lists.transform(Utilities.getColumnTypes(properties), type -> TypeInfoFactory.getPrimitiveTypeInfo(type)), @@ -214,8 +208,7 @@ private void initFromProperties(final Properties properties) )); columns = columnNames.toArray(new String[columnNames.size()]); types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); - inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, inspectors); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } private void initFromDruidQueryPlan(Properties properties, String druidQuery) { @@ -229,16 +222,16 @@ private void initFromDruidQueryPlan(Properties properties, String druidQuery) { Preconditions.checkNotNull(properties.getProperty(Constants.DRUID_QUERY_FIELD_TYPES, null)); if (fieldNamesProperty.isEmpty()) { // this might seem counter intuitive but some queries like query - // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL) LIMIT 1 - // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty columns + // SELECT YEAR(Calcs.date0) AS yr_date0_ok FROM druid_tableau.calcs Calcs WHERE (YEAR(Calcs.date0) IS NULL) + // LIMIT 1 + // is planed in a way where we only push a filter down and keep the project of null as hive project. Thus empty + // columns columnNames = Collections.EMPTY_LIST; columnTypes = Collections.EMPTY_LIST; } else { - columnNames = - Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList()); + columnNames = Arrays.stream(fieldNamesProperty.trim().split(",")).collect(Collectors.toList()); columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty).stream() - .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName())) - .map(primitiveTypeInfo -> { + .map(e -> TypeInfoFactory.getPrimitiveTypeInfo(e.getTypeName())).map(primitiveTypeInfo -> { if (primitiveTypeInfo instanceof TimestampLocalTZTypeInfo) { return tsTZTypeInfo; } @@ -257,11 +250,11 @@ private void initFromDruidQueryPlan(Properties properties, String druidQuery) { /* Submits the request and returns */ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) - throws SerDeException, IOException { + throws SerDeException, IOException { InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createSmileRequest(address, query) + DruidStorageHandlerUtils.createSmileRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); @@ -272,10 +265,9 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ try { // This will throw an exception in case of the response from druid is not an array // this case occurs if for instance druid query execution returns an exception instead of array of results. - resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, - new TypeReference>() { - } - ); + resultsList = + DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, new TypeReference>() { + }); } catch (Exception e) { response.close(); throw new SerDeException(StringUtils.stringifyException(e)); @@ -290,17 +282,14 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ return resultsList.get(0); } - @Override - public Class getSerializedClass() { + @Override public Class getSerializedClass() { return DruidWritable.class; } - @Override - public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { + @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) { - throw new SerDeException(getClass().toString() - + " can only serialize struct types, but we got: " - + objectInspector.getTypeName()); + throw new SerDeException( + getClass().toString() + " can only serialize struct types, but we got: " + objectInspector.getTypeName()); } // Prepare the field ObjectInspectors @@ -317,52 +306,49 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD } final Object res; switch (types[i].getPrimitiveCategory()) { - case TIMESTAMP: - res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector()) - .getPrimitiveJavaObject( - values.get(i)).getTime(); - break; - case TIMESTAMPLOCALTZ: - res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli(); - break; - case BYTE: - res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); - break; - case SHORT: - res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); - break; - case INT: - res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); - break; - case LONG: - res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); - break; - case FLOAT: - res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); - break; - case DOUBLE: - res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector()) - .get(values.get(i)); - break; - case CHAR: - res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(i)).getValue(); - break; - case VARCHAR: - res = ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(i)).getValue(); - break; - case STRING: - res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(i)); - break; - case BOOLEAN: - res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector()) - .get(values.get(i)); - break; - default: - throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory()); + case TIMESTAMP: + res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i)) + .getTime(); + break; + case TIMESTAMPLOCALTZ: + res = ((TimestampLocalTZObjectInspector) fields.get(i).getFieldObjectInspector()) + .getPrimitiveJavaObject(values.get(i)).getZonedDateTime().toInstant().toEpochMilli(); + break; + case BYTE: + res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case SHORT: + res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case INT: + res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case LONG: + res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case FLOAT: + res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case DOUBLE: + res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case CHAR: + res = ((HiveCharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i)) + .getValue(); + break; + case VARCHAR: + res = + ((HiveVarcharObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i)) + .getValue(); + break; + case STRING: + res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(values.get(i)); + break; + case BOOLEAN: + res = ((BooleanObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + default: + throw new SerDeException("Unsupported type: " + types[i].getPrimitiveCategory()); } value.put(columns[i], res); } @@ -370,38 +356,34 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD // First Segment Granularity has to be here. final int granularityFieldIndex = columns.length; assert values.size() > granularityFieldIndex; - Preconditions.checkArgument(fields.get(granularityFieldIndex).getFieldName() - .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)); + Preconditions.checkArgument( + fields.get(granularityFieldIndex).getFieldName().equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)); value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime() + ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector()) + .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime() ); if (values.size() == columns.length + 2) { // Then partition number if any. final int partitionNumPos = granularityFieldIndex + 1; - Preconditions.checkArgument( - fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME), + Preconditions.checkArgument(fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME), String.format("expecting to encounter %s but was %s", Constants.DRUID_SHARD_KEY_COL_NAME, fields.get(partitionNumPos).getFieldName() ) ); value.put(Constants.DRUID_SHARD_KEY_COL_NAME, - ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector()) - .get(values.get(partitionNumPos)) + ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector()).get(values.get(partitionNumPos)) ); } return new DruidWritable(value); } - @Override - public SerDeStats getSerDeStats() { + @Override public SerDeStats getSerDeStats() { // no support for statistics return null; } - @Override - public Object deserialize(Writable writable) throws SerDeException { + @Override public Object deserialize(Writable writable) throws SerDeException { final DruidWritable input = (DruidWritable) writable; final List output = Lists.newArrayListWithExpectedSize(columns.length); for (int i = 0; i < columns.length; i++) { @@ -411,72 +393,89 @@ public Object deserialize(Writable writable) throws SerDeException { continue; } switch (types[i].getPrimitiveCategory()) { - case TIMESTAMP: - output.add(new TimestampWritable(Timestamp.valueOf(ZonedDateTime - .ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), - tsTZTypeInfo.timeZone() - ).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toString()))); - break; - case TIMESTAMPLOCALTZ: - output.add( - new TimestampLocalTZWritable( - new TimestampTZ( - ZonedDateTime.ofInstant( - Instant.ofEpochMilli(((Number) value).longValue()), - ((TimestampLocalTZTypeInfo) types[i]).timeZone())))); - break; - case BYTE: - output.add(new ByteWritable(((Number) value).byteValue())); - break; - case SHORT: - output.add(new ShortWritable(((Number) value).shortValue())); - break; - case INT: + case TIMESTAMP: + if (value instanceof Number) { + output.add(new TimestampWritable(Timestamp.valueOf( + ZonedDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), tsTZTypeInfo.timeZone()) + .format(DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT))))); + } else { + output.add(new TimestampWritable(Timestamp.valueOf((String) value))); + } + + break; + case TIMESTAMPLOCALTZ: + final long numberOfMillis; + if (value instanceof Number) { + numberOfMillis = ((Number) value).longValue(); + } else { + // it is an extraction fn need to be parsed + numberOfMillis = dateOptionalTimeParser().parseDateTime((String) value).getMillis(); + } + output.add(new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime + .ofInstant(Instant.ofEpochMilli(numberOfMillis), + ((TimestampLocalTZTypeInfo) types[i]).timeZone() + )))); + break; + case DATE: + final DateWritable dateWritable; + if (value instanceof Number) { + dateWritable = new DateWritable(new Date((((Number) value).longValue()))); + } else { + // it is an extraction fn need to be parsed + dateWritable = new DateWritable(new Date(dateOptionalTimeParser().parseDateTime((String) value).getMillis())); + } + output.add(dateWritable); + break; + case BYTE: + output.add(new ByteWritable(((Number) value).byteValue())); + break; + case SHORT: + output.add(new ShortWritable(((Number) value).shortValue())); + break; + case INT: + if (value instanceof Number) { output.add(new IntWritable(((Number) value).intValue())); - break; - case LONG: - output.add(new LongWritable(((Number) value).longValue())); - break; - case FLOAT: - output.add(new FloatWritable(((Number) value).floatValue())); - break; - case DOUBLE: - output.add(new DoubleWritable(((Number) value).doubleValue())); - break; - case CHAR: - output.add( - new HiveCharWritable( - new HiveChar( - value.toString(), - ((CharTypeInfo) types[i]).getLength()))); - break; - case VARCHAR: - output.add( - new HiveVarcharWritable( - new HiveVarchar( - value.toString(), - ((VarcharTypeInfo) types[i]).getLength()))); - break; - case STRING: - output.add(new Text(value.toString())); - break; - case BOOLEAN: - output.add(new BooleanWritable(Boolean.valueOf(value.toString()))); - break; - default: - throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); + } else { + // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn + //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls + output.add(new IntWritable(Integer.valueOf((String) value))); + } + + break; + case LONG: + output.add(new LongWritable(((Number) value).longValue())); + break; + case FLOAT: + output.add(new FloatWritable(((Number) value).floatValue())); + break; + case DOUBLE: + output.add(new DoubleWritable(((Number) value).doubleValue())); + break; + case CHAR: + output.add(new HiveCharWritable(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength()))); + break; + case VARCHAR: + output + .add(new HiveVarcharWritable(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength()))); + break; + case STRING: + output.add(new Text(value.toString())); + break; + case BOOLEAN: + output.add(new BooleanWritable(Boolean.valueOf(value.toString()))); + break; + default: + throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); } } return output; } - @Override - public ObjectInspector getObjectInspector() { + @Override public ObjectInspector getObjectInspector() { return inspector; } - @Override - public boolean shouldStoreFieldsInMetastore(Map tableParams) { + @Override public boolean shouldStoreFieldsInMetastore(Map tableParams) { // If Druid table is not an external table store the schema in metadata store. return !MetaStoreUtils.isExternal(tableParams); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index 1c34e418cb..630e097c19 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -22,10 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.dimension.ExtractionDimensionSpec; -import io.druid.query.extraction.TimeFormatExtractionFn; - /** * Utils class for Druid SerDe. */ @@ -34,6 +30,7 @@ private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class); protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"; protected static final String FLOAT_TYPE = "FLOAT"; protected static final String DOUBLE_TYPE = "DOUBLE"; diff --git ql/src/test/queries/clientpositive/druidmini_extractTime.q ql/src/test/queries/clientpositive/druidmini_extractTime.q index 2f7129edeb..429f7965c2 100644 --- ql/src/test/queries/clientpositive/druidmini_extractTime.q +++ ql/src/test/queries/clientpositive/druidmini_extractTime.q @@ -1,3 +1,5 @@ +--! qt:dataset:alltypesorc + SET hive.vectorized.execution.enabled=false; CREATE TABLE druid_table STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' @@ -160,5 +162,18 @@ AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1; SELECT EXTRACT(YEAR from `__time`), SUBSTRING(CAST(CAST(`__time` AS DATE) AS STRING), 1, 4) as year_str FROM druid_table WHERE EXTRACT(YEAR from `__time`) >= 1969 AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1; +-- Cast to Timestamp + +explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5; + +SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5; + +-- Cast to Date + +explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5; + +SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5; + +SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5; DROP TABLE druid_table; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out index c3679a328e..a1b1d24ef2 100644 --- ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out @@ -1009,6 +1009,86 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@druid_table POSTHOOK: Output: hdfs://### HDFS PATH ### 1969 1969 +PREHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.fieldNames extract,$f1 + druid.fieldTypes timestamp,double + druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"extraction","dimension":"__time","outputName":"extract","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd HH:mm:ss","timeZone":"US/Pacific","locale":"en"}}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"extract","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]} + druid.query.type groupBy + Select Operator + expressions: extract (type: timestamp), $f1 (type: double) + outputColumnNames: _col0, _col1 + ListSink + +PREHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1969-12-31 15:59:00 -4532.569952011108 +1969-12-31 16:00:00 -35057.67698967457 +PREHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: explain SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.fieldNames vc,$f1 + druid.fieldTypes date,double + druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"default","dimension":"vc","outputName":"vc","outputType":"LONG"}],"virtualColumns":[{"type":"expression","name":"vc","expression":"timestamp_floor(\"__time\",'P1D','','US/Pacific')","outputType":"LONG"}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"vc","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]} + druid.query.type groupBy + Select Operator + expressions: vc (type: date), $f1 (type: double) + outputColumnNames: _col0, _col1 + ListSink + +PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS DATE) ORDER BY `x_date` LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1969-12-31 -39590.24694168568 +PREHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT CAST(`__time` AS DATE) AS `x_date` FROM druid_table ORDER BY `x_date` LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1969-12-31 +1969-12-31 +1969-12-31 +1969-12-31 +1969-12-31 PREHOOK: query: DROP TABLE druid_table PREHOOK: type: DROPTABLE PREHOOK: Input: default@druid_table