diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index 00a4b72a34..f8cce72609 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -33,6 +33,9 @@ import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; import org.joda.time.format.ISODateTimeFormat; import java.io.IOException; @@ -41,6 +44,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT; +import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.TIMESTAMP_FORMAT; /** * Record reader for results for Druid GroupByQuery. @@ -49,6 +53,7 @@ extends DruidQueryRecordReader { private final static TypeReference TYPE_REFERENCE = new TypeReference() { }; + private static DateTimeFormatter dateTimeParser = createAutoParser(); private MapBasedRow currentRow; private Map currentEvent; @@ -86,7 +91,8 @@ private void initDimensionTypes() throws IOException { if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) { final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec .getExtractionFn(); - if (timeFormatExtractionFn == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) { + if (timeFormatExtractionFn != null && (timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT) + || timeFormatExtractionFn.getFormat().equals(TIMESTAMP_FORMAT))) { timeExtractionFields.add(extractionDimensionSpec.getOutputName()); } else { intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName()); @@ -107,7 +113,7 @@ public boolean nextKeyValue() { currentEvent = Maps.transformEntries(currentRow.getEvent(), (key, value1) -> { if (timeExtractionFields.contains(key)) { - return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1); + return dateTimeParser.parseMillis((String) value1); } if (intFormattedTimeExtractionFields.contains(key)) { return Integer.valueOf((String) value1); @@ -159,4 +165,28 @@ public float getProgress() throws IOException { return queryResultsIterator.hasNext() ? 0 : 1; } + private static DateTimeFormatter createAutoParser() + { + final DateTimeFormatter offsetElement = new DateTimeFormatterBuilder() + .appendTimeZoneOffset("Z", true, 2, 4) + .toFormatter(); + + DateTimeParser timeOrOffset = new DateTimeFormatterBuilder() + .append( + null, + new DateTimeParser[]{ + new DateTimeFormatterBuilder().appendLiteral('T').toParser(), + new DateTimeFormatterBuilder().appendLiteral(' ').toParser() + } + ) + .appendOptional(ISODateTimeFormat.timeElementParser().getParser()) + .appendOptional(offsetElement.getParser()) + .toParser(); + + return new DateTimeFormatterBuilder() + .append(ISODateTimeFormat.dateElementParser()) + .appendOptional(timeOrOffset) + .toFormatter(); + } + } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 5f7657975a..3e032261e8 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -97,6 +97,8 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.TIMESTAMP_FORMAT; + /** * DruidSerDe that is used to deserialize objects from a Druid data source. */ @@ -415,7 +417,7 @@ public Object deserialize(Writable writable) throws SerDeException { output.add(new TimestampWritable(Timestamp.valueOf(ZonedDateTime .ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), tsTZTypeInfo.timeZone() - ).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toString()))); + ).format(DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT)).toString()))); break; case TIMESTAMPLOCALTZ: output.add( diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index 1c34e418cb..630e097c19 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -22,10 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.dimension.ExtractionDimensionSpec; -import io.druid.query.extraction.TimeFormatExtractionFn; - /** * Utils class for Druid SerDe. */ @@ -34,6 +30,7 @@ private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class); protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"; protected static final String FLOAT_TYPE = "FLOAT"; protected static final String DOUBLE_TYPE = "DOUBLE"; diff --git ql/src/test/queries/clientpositive/druidmini_extractTime.q ql/src/test/queries/clientpositive/druidmini_extractTime.q index 2f7129edeb..d3ee1c0c8d 100644 --- ql/src/test/queries/clientpositive/druidmini_extractTime.q +++ ql/src/test/queries/clientpositive/druidmini_extractTime.q @@ -1,3 +1,5 @@ +--! qt:dataset:alltypesorc + SET hive.vectorized.execution.enabled=false; CREATE TABLE druid_table STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' @@ -160,5 +162,10 @@ AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1; SELECT EXTRACT(YEAR from `__time`), SUBSTRING(CAST(CAST(`__time` AS DATE) AS STRING), 1, 4) as year_str FROM druid_table WHERE EXTRACT(YEAR from `__time`) >= 1969 AND CAST(EXTRACT(YEAR from `__time`) as STRING) = '1969' LIMIT 1; +-- Cast to Timestamp + +explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5; + +SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5; DROP TABLE druid_table; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out index cf8161f4cb..920fb79a8b 100644 --- ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_extractTime.q.out @@ -1015,6 +1015,40 @@ POSTHOOK: Input: default@druid_table POSTHOOK: Output: hdfs://### HDFS PATH ### 1969 1969 1969 1969 +PREHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: explain SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.fieldNames extract,$f1 + druid.fieldTypes timestamp,double + druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"extraction","dimension":"__time","outputName":"extract","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd HH:mm:ss","timeZone":"US/Pacific","locale":"en"}}],"limitSpec":{"type":"default","limit":5,"columns":[{"dimension":"extract","direction":"ascending","dimensionOrder":"lexicographic"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"}],"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]} + druid.query.type groupBy + Select Operator + expressions: extract (type: timestamp), $f1 (type: double) + outputColumnNames: _col0, _col1 + ListSink + +PREHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT CAST(`__time` AS TIMESTAMP) AS `x_time`, SUM(cfloat) FROM druid_table GROUP BY CAST(`__time` AS TIMESTAMP) ORDER BY `x_time` LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1969-12-31 15:59:00 -4532.569952011108 +1969-12-31 16:00:00 -35057.67698967457 PREHOOK: query: DROP TABLE druid_table PREHOOK: type: DROPTABLE PREHOOK: Input: default@druid_table