diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index f0bdb9e..fddabf7 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -24,8 +24,10 @@ import org.apache.calcite.adapter.druid.DruidTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; +import org.joda.time.format.ISODateTimeFormat; import com.fasterxml.jackson.core.type.TypeReference; @@ -45,6 +47,10 @@ private int[] indexes = new int[0]; + // Grouping dimensions can have different types if we are grouping using an + // extraction function + private PrimitiveTypeInfo[] dimensionTypes; + // Row objects returned by GroupByQuery have different access paths depending on // whether the result for the metric is a Float or a Long, thus we keep track // using these converters @@ -53,6 +59,7 @@ @Override public void initialize(InputSplit split, Configuration conf) throws IOException { super.initialize(split, conf); + initDimensionTypes(); initExtractors(); } @@ -69,6 +76,13 @@ protected GroupByQuery createQuery(String content) throws IOException { ); } + private void initDimensionTypes() throws IOException { + dimensionTypes = new PrimitiveTypeInfo[query.getDimensions().size()]; + for (int i = 0; i < query.getDimensions().size(); i++) { + dimensionTypes[i] = DruidSerDeUtils.extractTypeFromDimension(query.getDimensions().get(i)); + } + } + private void initExtractors() throws IOException { extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs() .size()]; @@ -137,7 +151,20 @@ public DruidWritable getCurrentValue() throws IOException, InterruptedException value.getValue().put(ds.getOutputName(), null); } else { int pos = dims.size() - indexes[i] - 1; - value.getValue().put(ds.getOutputName(), dims.get(pos)); + Object val; + switch (dimensionTypes[i].getPrimitiveCategory()) { + case TIMESTAMP: + // FLOOR extraction function + val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); + break; + case INT: + // EXTRACT extraction function + val = Integer.valueOf((String) dims.get(pos)); + break; + default: + val = dims.get(pos); + } + value.getValue().put(ds.getOutputName(), val); } } int counter = 0; @@ -176,7 +203,20 @@ public boolean next(NullWritable key, DruidWritable value) { value.getValue().put(ds.getOutputName(), null); } else { int pos = dims.size() - indexes[i] - 1; - value.getValue().put(ds.getOutputName(), dims.get(pos)); + Object val; + switch (dimensionTypes[i].getPrimitiveCategory()) { + case TIMESTAMP: + // FLOOR extraction function + val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); + break; + case INT: + // EXTRACT extraction function + val = Integer.valueOf((String) dims.get(pos)); + break; + default: + val = dims.get(pos); + } + value.getValue().put(ds.getOutputName(), val); } } int counter = 0; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 656c0f1..979283c 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -68,6 +68,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; import org.joda.time.Period; +import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.parsers.ParseException; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; @@ -85,6 +87,8 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; import io.druid.query.groupby.GroupByQuery; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -371,7 +375,7 @@ private void inferSchema(GroupByQuery query, List columnNames, // Dimension columns for (DimensionSpec ds : query.getDimensions()) { columnNames.add(ds.getOutputName()); - columnTypes.add(TypeInfoFactory.stringTypeInfo); + columnTypes.add(DruidSerDeUtils.extractTypeFromDimension(ds)); } // Aggregator columns for (AggregatorFactory af : query.getAggregatorSpecs()) { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index 64a19f6..c8a63ab 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -23,6 +23,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; + /** * Utils class for Druid SerDe. */ @@ -30,10 +34,10 @@ private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class); - protected static final String FLOAT_TYPE = "FLOAT"; + protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + protected static final String FLOAT_TYPE = "FLOAT"; protected static final String LONG_TYPE = "LONG"; - protected static final String STRING_TYPE = "STRING"; /* This method converts from the String representation of Druid type @@ -82,4 +86,22 @@ public static String convertDruidToHiveTypeString(String typeName) { } } + /* Extract type from dimension spec. It returns TIMESTAMP if it is a FLOOR, + * INTEGER if it is a EXTRACT, or STRING otherwise. */ + public static PrimitiveTypeInfo extractTypeFromDimension(DimensionSpec ds) { + if (ds instanceof ExtractionDimensionSpec) { + ExtractionDimensionSpec eds = (ExtractionDimensionSpec) ds; + TimeFormatExtractionFn tfe = (TimeFormatExtractionFn) eds.getExtractionFn(); + if (tfe.getFormat() == null || tfe.getFormat().equals(ISO_TIME_FORMAT)) { + // Timestamp (null or default used by FLOOR) + return TypeInfoFactory.timestampTypeInfo; + } else { + // EXTRACT from timestamp + return TypeInfoFactory.intTypeInfo; + } + } + // Default + return TypeInfoFactory.stringTypeInfo; + } + }