diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 7169140..616b080 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,6 +17,18 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Throwables; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; import io.druid.math.expr.ExprMacroTable; @@ -112,6 +124,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -201,7 +214,7 @@ public int columnCacheSizeBytes() { } /** - * Method that creates a request for Druid JSON query (using SMILE). + * Method that creates a request for Druid query using SMILE format. * * @param address * @param query @@ -210,7 +223,7 @@ public int columnCacheSizeBytes() { * * @throws IOException */ - public static Request createRequest(String address, BaseQuery query) + public static Request createSmileRequest(String address, io.druid.query.Query query) throws IOException { return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))) .setContent(SMILE_MAPPER.writeValueAsBytes(query)) diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 3711595..209d60d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -93,6 +93,20 @@ protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class); + public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { + switch (druidQueryType) { + case Query.TIMESERIES: + return new DruidTimeseriesQueryRecordReader(); + case Query.TOPN: + return new DruidTopNQueryRecordReader(); + case Query.GROUP_BY: + return new DruidGroupByQueryRecordReader(); + case Query.SELECT: + return new DruidSelectQueryRecordReader(); + } + return null; + } + @Override public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -192,6 +206,7 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio final String request = String.format( "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8")); + LOG.debug("sending request {} to query for segments", request); final InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); @@ -221,8 +236,12 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio // Create partial Select query final SegmentDescriptor newSD = new SegmentDescriptor( locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); - final SelectQuery partialQuery = query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))); + //@TODO This is fetching all the rows at once from broker or multiple historical nodes + // Move to use scan query to avoid GC back pressure on the nodes + // https://issues.apache.org/jira/browse/HIVE-17627 + final SelectQuery partialQuery = query + .withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))) + .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts); } @@ -256,7 +275,7 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, metadataQuery) + DruidStorageHandlerUtils.createSmileRequest(address, metadataQuery) ); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -309,7 +328,7 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio TimeBoundaryQuery timeQuery = timeBuilder.build(); try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, timeQuery) + DruidStorageHandlerUtils.createSmileRequest(address, timeQuery) ); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -414,21 +433,10 @@ private static String deserializeSerialize(String druidQuery) reader.initialize((HiveDruidSplit) split, job); return reader; } - switch (druidQueryType) { - case Query.TIMESERIES: - reader = new DruidTimeseriesQueryRecordReader(); - break; - case Query.TOPN: - reader = new DruidTopNQueryRecordReader(); - break; - case Query.GROUP_BY: - reader = new DruidGroupByQueryRecordReader(); - break; - case Query.SELECT: - reader = new DruidSelectQueryRecordReader(); - break; - default: - throw new IOException("Druid query type not recognized"); + + reader = getDruidQueryReader(druidQueryType); + if (reader == null) { + throw new IOException("Druid query type " + druidQueryType + " not recognized"); } reader.initialize((HiveDruidSplit) split, job); return reader; @@ -444,22 +452,10 @@ private static String deserializeSerialize(String druidQuery) if (druidQueryType == null) { return new DruidSelectQueryRecordReader(); // By default } - final DruidQueryRecordReader reader; - switch (druidQueryType) { - case Query.TIMESERIES: - reader = new DruidTimeseriesQueryRecordReader(); - break; - case Query.TOPN: - reader = new DruidTopNQueryRecordReader(); - break; - case Query.GROUP_BY: - reader = new DruidGroupByQueryRecordReader(); - break; - case Query.SELECT: - reader = new DruidSelectQueryRecordReader(); - break; - default: - throw new IOException("Druid query type not recognized"); + final DruidQueryRecordReader reader = + getDruidQueryReader(druidQueryType); + if (reader == null) { + throw new IOException("Druid query type " + druidQueryType + " not recognized"); } return reader; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index b5b254a..359ed36 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -18,113 +18,105 @@ package org.apache.hadoop.hive.druid.serde; import java.io.IOException; -import java.io.InputStream; import java.util.List; - +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.http.client.HttpClient; +import io.druid.data.input.MapBasedRow; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; -import org.joda.time.format.ISODateTimeFormat; import com.fasterxml.jackson.core.type.TypeReference; import io.druid.data.input.Row; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; +import org.joda.time.format.ISODateTimeFormat; + +import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT; /** * Record reader for results for Druid GroupByQuery. */ public class DruidGroupByQueryRecordReader extends DruidQueryRecordReader { + private final static TypeReference TYPE_REFERENCE = new TypeReference() { + }; - private Row current; - - private int[] indexes = new int[0]; + private MapBasedRow currentRow; + private Map currentEvent; - // Grouping dimensions can have different types if we are grouping using an - // extraction function - private PrimitiveTypeInfo[] dimensionTypes; - - // Row objects returned by GroupByQuery have different access paths depending on - // whether the result for the metric is a Float or a Long, thus we keep track - // using these converters - private Extract[] extractors; + private List timeExtractionFields = Lists.newArrayList(); + private List intFormattedTimeExtractionFields = Lists.newArrayList(); @Override public void initialize(InputSplit split, Configuration conf) throws IOException { super.initialize(split, conf); initDimensionTypes(); - initExtractors(); } @Override - protected GroupByQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, GroupByQuery.class); + public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, + ObjectMapper smileMapper, HttpClient httpClient + ) throws IOException { + super.initialize(split, conf, mapper, smileMapper, httpClient); + initDimensionTypes(); } @Override - protected List createResultsList(InputStream content) throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } private void initDimensionTypes() throws IOException { - dimensionTypes = new PrimitiveTypeInfo[query.getDimensions().size()]; - for (int i = 0; i < query.getDimensions().size(); i++) { - dimensionTypes[i] = DruidSerDeUtils.extractTypeFromDimension(query.getDimensions().get(i)); - } - } - - private void initExtractors() throws IOException { - extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs() - .size()]; - int counter = 0; - for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) { - AggregatorFactory af = query.getAggregatorSpecs().get(i); - switch (af.getTypeName().toUpperCase()) { - case DruidSerDeUtils.FLOAT_TYPE: - extractors[counter] = Extract.FLOAT; - break; - case DruidSerDeUtils.LONG_TYPE: - extractors[counter] = Extract.LONG; - break; - default: - throw new IOException("Type not supported"); + //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe + List dimensionSpecList = ((GroupByQuery) query).getDimensions(); + List extractionDimensionSpecList = dimensionSpecList.stream() + .filter(dimensionSpecs -> dimensionSpecs instanceof ExtractionDimensionSpec) + .collect(Collectors.toList()); + extractionDimensionSpecList.stream().forEach(dimensionSpec -> { + ExtractionDimensionSpec extractionDimensionSpec = (ExtractionDimensionSpec) dimensionSpec; + if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) { + final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec + .getExtractionFn(); + if (timeFormatExtractionFn == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) { + timeExtractionFields.add(extractionDimensionSpec.getOutputName()); + } else { + intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName()); + } } - } - for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) { - extractors[counter] = Extract.FLOAT; - } + }); } @Override public boolean nextKeyValue() { - // Refresh indexes - for (int i = indexes.length - 1; i >= 0; i--) { - if (indexes[i] > 0) { - indexes[i]--; - for (int j = i + 1; j < indexes.length; j++) { - indexes[j] = current.getDimension( - query.getDimensions().get(j).getOutputName()).size() - 1; - } - return true; - } - } // Results - if (results.hasNext()) { - current = results.next(); - indexes = new int[query.getDimensions().size()]; - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - indexes[i] = current.getDimension(ds.getOutputName()).size() - 1; - } + + if (queryResultsIterator.hasNext()) { + final Row row = queryResultsIterator.next(); + // currently druid supports only MapBasedRow as Jackson SerDe so it should safe to cast without check + currentRow = (MapBasedRow) row; + //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe + currentEvent = Maps.transformEntries(currentRow.getEvent(), + (key, value1) -> { + if (timeExtractionFields.contains(key)) { + return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1); + } + if (intFormattedTimeExtractionFields.contains(key)) { + return Integer.valueOf((String) value1); + } + return value1; + } + ); return true; } return false; @@ -140,49 +132,9 @@ public DruidWritable getCurrentValue() throws IOException, InterruptedException // Create new value DruidWritable value = new DruidWritable(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); // 2) The dimension columns - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - List dims = current.getDimension(ds.getOutputName()); - if (dims.size() == 0) { - // NULL value for dimension - value.getValue().put(ds.getOutputName(), null); - } else { - int pos = dims.size() - indexes[i] - 1; - Object val; - switch (dimensionTypes[i].getPrimitiveCategory()) { - case TIMESTAMP: - // FLOOR extraction function - val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); - break; - case INT: - // EXTRACT extraction function - val = Integer.valueOf((String) dims.get(pos)); - break; - default: - val = dims.get(pos); - } - value.getValue().put(ds.getOutputName(), val); - } - } - int counter = 0; - // 3) The aggregation columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - switch (extractors[counter++]) { - case FLOAT: - value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); - break; - case LONG: - value.getValue().put(af.getName(), current.getLongMetric(af.getName())); - break; - } - } - // 4) The post-aggregation columns - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - assert extractors[counter++] == Extract.FLOAT; - value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); - } + value.getValue().putAll(currentEvent); return value; } @@ -192,49 +144,9 @@ public boolean next(NullWritable key, DruidWritable value) { // Update value value.getValue().clear(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); // 2) The dimension columns - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - List dims = current.getDimension(ds.getOutputName()); - if (dims.size() == 0) { - // NULL value for dimension - value.getValue().put(ds.getOutputName(), null); - } else { - int pos = dims.size() - indexes[i] - 1; - Object val; - switch (dimensionTypes[i].getPrimitiveCategory()) { - case TIMESTAMP: - // FLOOR extraction function - val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); - break; - case INT: - // EXTRACT extraction function - val = Integer.valueOf((String) dims.get(pos)); - break; - default: - val = dims.get(pos); - } - value.getValue().put(ds.getOutputName(), val); - } - } - int counter = 0; - // 3) The aggregation columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - switch (extractors[counter++]) { - case FLOAT: - value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); - break; - case LONG: - value.getValue().put(af.getName(), current.getLongMetric(af.getName())); - break; - } - } - // 4) The post-aggregation columns - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - assert extractors[counter++] == Extract.FLOAT; - value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); - } + value.getValue().putAll(currentEvent); return true; } return false; @@ -242,12 +154,7 @@ public boolean next(NullWritable key, DruidWritable value) { @Override public float getProgress() throws IOException { - return results.hasNext() ? 0 : 1; - } - - private enum Extract { - FLOAT, - LONG + return queryResultsIterator.hasNext() ? 0 : 1; } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 103591d..de06533 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -17,14 +17,23 @@ */ package org.apache.hadoop.hive.druid.serde; -import com.google.common.collect.Iterators; -import com.metamx.common.lifecycle.Lifecycle; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.guava.CloseQuietly; import io.druid.query.BaseQuery; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.io.HiveDruidSplit; @@ -32,14 +41,16 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.joda.time.Period; +import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * Base record reader for given a Druid query. This class contains the logic to @@ -56,57 +67,65 @@ private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class); + private HttpClient httpClient; + private ObjectMapper mapper; + // Smile mapper is used to read query results that are serialized as binary instead of json + private ObjectMapper smileMapper; + /** * Query that Druid executes. */ - protected T query; + protected Query query; /** - * Query results. + * Query results as a streaming iterator. */ - protected Iterator results = Iterators.emptyIterator(); + protected JsonParserIterator queryResultsIterator = null; + + /** + * Result type definition used to read the rows, this is query dependent. + */ + protected JavaType resultsType = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { initialize(split, context.getConfiguration()); } - public void initialize(InputSplit split, Configuration conf) throws IOException { + public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, + ObjectMapper smileMapper, HttpClient httpClient + ) throws IOException { HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split; - + Preconditions.checkNotNull(hiveDruidSplit, "input split is null ???"); + this.mapper = Preconditions.checkNotNull(mapper, "object Mapper can not be null"); + // Smile mapper is used to read query results that are serilized as binary instead of json + this.smileMapper = Preconditions.checkNotNull(smileMapper, "Smile Mapper can not be null"); // Create query - query = createQuery(hiveDruidSplit.getDruidQuery()); - + this.query = this.mapper.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()), Query.class); + Preconditions.checkNotNull(query); + this.resultsType = getResultTypeDef(); + this.httpClient = Preconditions.checkNotNull(httpClient, "need Http Client"); // Execute query - if (LOG.isInfoEnabled()) { - LOG.info("Retrieving from druid using query:\n " + query); - } - - InputStream response; - try { - response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); - } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + LOG.debug("Retrieving data from druid using query:\n " + query); + final String address = hiveDruidSplit.getLocations()[0]; + if (Strings.isNullOrEmpty(address)) { + throw new IOException("can not fetch results form empty or null host value"); } - - // Retrieve results - List resultsList; - try { - resultsList = createResultsList(response); - } catch (IOException e) { - response.close(); - throw e; - } - if (resultsList == null || resultsList.isEmpty()) { - return; - } - results = resultsList.iterator(); + Request request = DruidStorageHandlerUtils.createSmileRequest(address, query); + Future inputStreamFuture = this.httpClient + .go(request, new InputStreamResponseHandler()); + queryResultsIterator = new JsonParserIterator(this.smileMapper, resultsType, inputStreamFuture, + request.getUrl().toString(), query + ); } - protected abstract T createQuery(String content) throws IOException; + public void initialize(InputSplit split, Configuration conf) throws IOException { + initialize(split, conf, DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.SMILE_MAPPER, DruidStorageHandler.getHttpClient() + ); + } - protected abstract List createResultsList(InputStream content) throws IOException; + protected abstract JavaType getResultTypeDef(); @Override public NullWritable createKey() { @@ -141,7 +160,123 @@ public long getPos() { @Override public void close() { - // Nothing to do + CloseQuietly.close(queryResultsIterator); + } + + /** + * This is a helper wrapper class used to create an iterator of druid rows out of InputStream. + * The type of the rows is defined by org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#typeRef + * + * @param druid Row type returned as result + */ + protected class JsonParserIterator> implements Iterator, Closeable + { + private JsonParser jp; + private ObjectCodec objectCodec; + private final ObjectMapper mapper; + private final JavaType typeRef; + private final Future future; + private final Query query; + private final String url; + + /** + * @param mapper mapper used to deserialize the stream of data (we use smile factory) + * @param typeRef Type definition of the results objects + * @param future Future holding the input stream (the input stream is not owned but it will be closed when org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#close() is called or reach the end of the steam) + * @param url URL used to fetch the data, used mostly as message with exception stack to identify the faulty stream, thus this can be empty string. + * @param query Query used to fetch the data, used mostly as message with exception stack, thus can be empty string. + */ + public JsonParserIterator(ObjectMapper mapper, + JavaType typeRef, + Future future, + String url, + Query query + ) + { + this.typeRef = typeRef; + this.future = future; + this.url = url; + this.query = query; + this.mapper = mapper; + jp = null; + } + + @Override + public boolean hasNext() + { + init(); + + if (jp.isClosed()) { + return false; + } + if (jp.getCurrentToken() == JsonToken.END_ARRAY) { + CloseQuietly.close(jp); + return false; + } + + return true; + } + + @Override + public R next() + { + init(); + + try { + final R retVal = objectCodec.readValue(jp, typeRef); + jp.nextToken(); + return retVal; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private void init() + { + if (jp == null) { + try { + InputStream is = future.get(); + if (is == null) { + throw new IOException(String.format("query[%s] url[%s] timed out", query, url)); + } else { + jp = mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true); + } + final JsonToken nextToken = jp.nextToken(); + if (nextToken == JsonToken.START_OBJECT) { + QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); + throw new QueryInterruptedException(cause); + } else if (nextToken != JsonToken.START_ARRAY) { + throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); + } else { + jp.nextToken(); + objectCodec = jp.getCodec(); + } + } + catch (IOException | InterruptedException | ExecutionException e) { + throw new RE( + e, + "Failure getting results for query[%s] url[%s] because of [%s]", + query, + url, + e.getMessage() + ); + } + } + } + + @Override + public void close() throws IOException + { + CloseQuietly.close(jp); + } } + } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java index 82eec5d..c0744b5 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; +import io.druid.query.select.SelectQueryQueryToolChest; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -39,22 +41,18 @@ public class DruidSelectQueryRecordReader extends DruidQueryRecordReader> { + private static final TypeReference> TYPE_REFERENCE = + new TypeReference>() + { + }; + private Result current; private Iterator values = Iterators.emptyIterator(); @Override - protected SelectQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, SelectQuery.class); - } - - @Override - protected List> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override @@ -62,8 +60,8 @@ public boolean nextKeyValue() throws IOException { if (values.hasNext()) { return true; } - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); values = current.getValue().getEvents().iterator(); return nextKeyValue(); } @@ -100,7 +98,7 @@ public boolean next(NullWritable key, DruidWritable value) throws IOException { @Override public float getProgress() { - return results.hasNext() || values.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1; } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 8750285..e6e01d1 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -132,21 +132,11 @@ public void initialize(Configuration configuration, Properties properties) throw properties.getProperty(serdeConstants.LIST_COLUMNS)); } columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), - new Function() { - @Override - public PrimitiveTypeInfo apply(String type) { - return TypeInfoFactory.getPrimitiveTypeInfo(type); - } - } + type -> TypeInfoFactory.getPrimitiveTypeInfo(type) )); inspectors.addAll(Lists.transform(columnTypes, - new Function() { - @Override - public ObjectInspector apply(PrimitiveTypeInfo type) { - return PrimitiveObjectInspectorFactory - .getPrimitiveWritableObjectInspector(type); - } - } + (Function) type -> PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(type) )); columns = columnNames.toArray(new String[columnNames.size()]); types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); @@ -273,7 +263,7 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, query) + DruidStorageHandlerUtils.createSmileRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index c8a63ab..86c325b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.druid.serde; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.slf4j.Logger; @@ -37,6 +36,7 @@ protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; protected static final String FLOAT_TYPE = "FLOAT"; + protected static final String DOUBLE_TYPE = "DOUBLE"; protected static final String LONG_TYPE = "LONG"; protected static final String STRING_TYPE = "STRING"; @@ -47,6 +47,8 @@ public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) { switch (typeName) { case FLOAT_TYPE: return TypeInfoFactory.floatTypeInfo; + case DOUBLE_TYPE: + return TypeInfoFactory.doubleTypeInfo; case LONG_TYPE: return TypeInfoFactory.longTypeInfo; case STRING_TYPE: @@ -63,29 +65,6 @@ public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) { } } - /* This method converts from the String representation of Druid type - * to the String representation of the corresponding Hive type */ - public static String convertDruidToHiveTypeString(String typeName) { - typeName = typeName.toUpperCase(); - switch (typeName) { - case FLOAT_TYPE: - return serdeConstants.FLOAT_TYPE_NAME; - case LONG_TYPE: - return serdeConstants.BIGINT_TYPE_NAME; - case STRING_TYPE: - return serdeConstants.STRING_TYPE_NAME; - default: - // This is a guard for special Druid types e.g. hyperUnique - // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator). - // Currently, we do not support doing anything special with them in Hive. - // However, those columns are there, and they can be actually read as normal - // dimensions e.g. with a select query. Thus, we print the warning and just read them - // as String. - LOG.warn("Transformation to STRING for unknown type " + typeName); - return serdeConstants.STRING_TYPE_NAME; - } - } - /* Extract type from dimension spec. It returns TIMESTAMP if it is a FLOOR, * INTEGER if it is a EXTRACT, or STRING otherwise. */ public static PrimitiveTypeInfo extractTypeFromDimension(DimensionSpec ds) { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java index a1c8488..971af82 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -36,26 +37,19 @@ public class DruidTimeseriesQueryRecordReader extends DruidQueryRecordReader> { + private static final TypeReference TYPE_REFERENCE = new TypeReference>() { + }; private Result current; @Override - protected TimeseriesQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TimeseriesQuery.class); - } - - @Override - protected List> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override public boolean nextKeyValue() { - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); return true; } return false; @@ -89,7 +83,7 @@ public boolean next(NullWritable key, DruidWritable value) { @Override public float getProgress() throws IOException { - return results.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() ? 0 : 1; } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java index afdf670..6e1fffe 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -39,22 +40,17 @@ public class DruidTopNQueryRecordReader extends DruidQueryRecordReader> { + private static final TypeReference> TYPE_REFERENCE = + new TypeReference>() { + }; + private Result current; private Iterator values = Iterators.emptyIterator(); @Override - protected TopNQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TopNQuery.class); - } - - @Override - protected List> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override @@ -62,8 +58,8 @@ public boolean nextKeyValue() { if (values.hasNext()) { return true; } - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); values = current.getValue().getValue().iterator(); return nextKeyValue(); } @@ -79,7 +75,9 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + current.getTimestamp().getMillis() + ); if (values.hasNext()) { value.getValue().putAll(values.next().getBaseObject()); return value; @@ -92,7 +90,9 @@ public boolean next(NullWritable key, DruidWritable value) { if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + current.getTimestamp().getMillis() + ); if (values.hasNext()) { value.getValue().putAll(values.next().getBaseObject()); } @@ -103,7 +103,7 @@ public boolean next(NullWritable key, DruidWritable value) { @Override public float getProgress() { - return results.hasNext() || values.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1; } } diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java deleted file mode 100644 index 137309c..0000000 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java +++ /dev/null @@ -1,918 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.druid; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.conf.Constants; -import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidSerDe; -import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidWritable; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveCharWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import io.druid.data.input.Row; -import io.druid.query.Query; -import io.druid.query.Result; -import io.druid.query.groupby.GroupByQuery; -import io.druid.query.select.SelectQuery; -import io.druid.query.select.SelectResultValue; -import io.druid.query.timeseries.TimeseriesQuery; -import io.druid.query.timeseries.TimeseriesResultValue; -import io.druid.query.topn.TopNQuery; -import io.druid.query.topn.TopNResultValue; - -/** - * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1 - * documentation. - */ -public class TestDruidSerDe { - - // Timeseries query - private static final String TIMESERIES_QUERY = - "{ \"queryType\": \"timeseries\", " - + " \"dataSource\": \"sample_datasource\", " - + " \"granularity\": \"day\", " - + " \"descending\": \"true\", " - + " \"filter\": { " - + " \"type\": \"and\", " - + " \"fields\": [ " - + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, " - + " { \"type\": \"or\", " - + " \"fields\": [ " - + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, " - + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } " - + " ] " - + " } " - + " ] " - + " }, " - + " \"aggregations\": [ " - + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, " - + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } " - + " ], " - + " \"postAggregations\": [ " - + " { \"type\": \"arithmetic\", " - + " \"name\": \"sample_divide\", " - + " \"fn\": \"/\", " - + " \"fields\": [ " - + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, " - + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } " - + " ] " - + " } " - + " ], " - + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}"; - - // Timeseries query results - private static final String TIMESERIES_QUERY_RESULTS = - "[ " - + "{ " - + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", " - + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } " - + "}, " - + "{ " - + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", " - + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } " - + "}]"; - - // Timeseries query results as records - private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), - new FloatWritable(1.0F), new FloatWritable(2.2222F) }, - new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), - new FloatWritable(3.32F), new FloatWritable(4F) } - }; - - // Timeseries query results as records (types defined by metastore) - private static final String TIMESERIES_COLUMN_NAMES = "__time,sample_name1,sample_name2,sample_divide"; - private static final String TIMESERIES_COLUMN_TYPES = "timestamp,smallint,double,float"; - private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS_2 = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new ShortWritable((short) 0), - new DoubleWritable(1.0d), new FloatWritable(2.2222F) }, - new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new ShortWritable((short) 2), - new DoubleWritable(3.32d), new FloatWritable(4F) } - }; - - // TopN query - private static final String TOPN_QUERY = - "{ \"queryType\": \"topN\", " - + " \"dataSource\": \"sample_data\", " - + " \"dimension\": \"sample_dim\", " - + " \"threshold\": 5, " - + " \"metric\": \"count\", " - + " \"granularity\": \"all\", " - + " \"filter\": { " - + " \"type\": \"and\", " - + " \"fields\": [ " - + " { " - + " \"type\": \"selector\", " - + " \"dimension\": \"dim1\", " - + " \"value\": \"some_value\" " - + " }, " - + " { " - + " \"type\": \"selector\", " - + " \"dimension\": \"dim2\", " - + " \"value\": \"some_other_val\" " - + " } " - + " ] " - + " }, " - + " \"aggregations\": [ " - + " { " - + " \"type\": \"longSum\", " - + " \"name\": \"count\", " - + " \"fieldName\": \"count\" " - + " }, " - + " { " - + " \"type\": \"doubleSum\", " - + " \"name\": \"some_metric\", " - + " \"fieldName\": \"some_metric\" " - + " } " - + " ], " - + " \"postAggregations\": [ " - + " { " - + " \"type\": \"arithmetic\", " - + " \"name\": \"sample_divide\", " - + " \"fn\": \"/\", " - + " \"fields\": [ " - + " { " - + " \"type\": \"fieldAccess\", " - + " \"name\": \"some_metric\", " - + " \"fieldName\": \"some_metric\" " - + " }, " - + " { " - + " \"type\": \"fieldAccess\", " - + " \"name\": \"count\", " - + " \"fieldName\": \"count\" " - + " } " - + " ] " - + " } " - + " ], " - + " \"intervals\": [ " - + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" " - + " ]}"; - - // TopN query results - private static final String TOPN_QUERY_RESULTS = - "[ " - + " { " - + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", " - + " \"result\": [ " - + " { " - + " \"sample_dim\": \"dim1_val\", " - + " \"count\": 111, " - + " \"some_metric\": 10669, " - + " \"sample_divide\": 96.11711711711712 " - + " }, " - + " { " - + " \"sample_dim\": \"another_dim1_val\", " - + " \"count\": 88, " - + " \"some_metric\": 28344, " - + " \"sample_divide\": 322.09090909090907 " - + " }, " - + " { " - + " \"sample_dim\": \"dim1_val3\", " - + " \"count\": 70, " - + " \"some_metric\": 871, " - + " \"sample_divide\": 12.442857142857143 " - + " }, " - + " { " - + " \"sample_dim\": \"dim1_val4\", " - + " \"count\": 62, " - + " \"some_metric\": 815, " - + " \"sample_divide\": 13.14516129032258 " - + " }, " - + " { " - + " \"sample_dim\": \"dim1_val5\", " - + " \"count\": 60, " - + " \"some_metric\": 2787, " - + " \"sample_divide\": 46.45 " - + " } " - + " ] " - + " }]"; - - // TopN query results as records - private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), - new LongWritable(111), new FloatWritable(10669F), - new FloatWritable(96.11711711711712F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), - new FloatWritable(322.09090909090907F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), - new FloatWritable(12.442857142857143F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), - new FloatWritable(13.14516129032258F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), - new FloatWritable(46.45F) } - }; - - // TopN query results as records (types defined by metastore) - private static final String TOPN_COLUMN_NAMES = "__time,sample_dim,count,some_metric,sample_divide"; - private static final String TOPN_COLUMN_TYPES = "timestamp,string,bigint,double,float"; - private static final Object[][] TOPN_QUERY_RESULTS_RECORDS_2 = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val"), new LongWritable(111), new DoubleWritable(10669d), - new FloatWritable(96.11711711711712F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("another_dim1_val"), new LongWritable(88), new DoubleWritable(28344d), - new FloatWritable(322.09090909090907F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val3"), new LongWritable(70), new DoubleWritable(871d), - new FloatWritable(12.442857142857143F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val4"), new LongWritable(62), new DoubleWritable(815d), - new FloatWritable(13.14516129032258F) }, - new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val5"), new LongWritable(60), new DoubleWritable(2787d), - new FloatWritable(46.45F) } - }; - - // GroupBy query - private static final String GROUP_BY_QUERY = - "{ " - + " \"queryType\": \"groupBy\", " - + " \"dataSource\": \"sample_datasource\", " - + " \"granularity\": \"day\", " - + " \"dimensions\": [\"country\", \"device\"], " - + " \"limitSpec\": {" - + " \"type\": \"default\"," - + " \"limit\": 5000," - + " \"columns\": [\"country\", \"data_transfer\"] }, " - + " \"filter\": { " - + " \"type\": \"and\", " - + " \"fields\": [ " - + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, " - + " { \"type\": \"or\", " - + " \"fields\": [ " - + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, " - + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } " - + " ] " - + " } " - + " ] " - + " }, " - + " \"aggregations\": [ " - + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, " - + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } " - + " ], " - + " \"postAggregations\": [ " - + " { \"type\": \"arithmetic\", " - + " \"name\": \"avg_usage\", " - + " \"fn\": \"/\", " - + " \"fields\": [ " - + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, " - + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } " - + " ] " - + " } " - + " ], " - + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], " - + " \"having\": { " - + " \"type\": \"greaterThan\", " - + " \"aggregation\": \"total_usage\", " - + " \"value\": 100 " - + " }}"; - - // GroupBy query results - private static final String GROUP_BY_QUERY_RESULTS = - "[ " - + " { " - + " \"version\" : \"v1\", " - + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", " - + " \"event\" : { " - + " \"country\" : \"India\", " - + " \"device\" : \"phone\", " - + " \"total_usage\" : 88, " - + " \"data_transfer\" : 29.91233453, " - + " \"avg_usage\" : 60.32 " - + " } " - + " }, " - + " { " - + " \"version\" : \"v1\", " - + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", " - + " \"event\" : { " - + " \"country\" : \"Spain\", " - + " \"device\" : \"pc\", " - + " \"total_usage\" : 16, " - + " \"data_transfer\" : 172.93494959, " - + " \"avg_usage\" : 6.333333 " - + " } " - + " }]"; - - // GroupBy query results as records - private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), - new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), - new FloatWritable(60.32F) }, - new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), - new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), - new FloatWritable(6.333333F) } - }; - - // GroupBy query results as records (types defined by metastore) - private static final String GROUP_BY_COLUMN_NAMES = "__time,country,device,total_usage,data_transfer,avg_usage"; - private static final String GROUP_BY_COLUMN_TYPES = "timestamp,string,string,int,double,float"; - private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS_2 = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), - new Text("phone"), new IntWritable(88), new DoubleWritable(29.91233453F), - new FloatWritable(60.32F) }, - new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), - new Text("pc"), new IntWritable(16), new DoubleWritable(172.93494959F), - new FloatWritable(6.333333F) } - }; - - // Select query - private static final String SELECT_QUERY = - "{ \"queryType\": \"select\", " - + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " - + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], " - + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " - + " \"granularity\": \"all\", " - + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " - + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }"; - - // Select query results - private static final String SELECT_QUERY_RESULTS = - "[{ " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"result\" : { " - + " \"pagingIdentifiers\" : { " - + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, " - + " \"events\" : [ { " - + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 0, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : \"1\", " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"11._korpus_(NOVJ)\", " - + " \"language\" : \"sl\", " - + " \"newpage\" : \"0\", " - + " \"user\" : \"EmausBot\", " - + " \"count\" : 1.0, " - + " \"added\" : 39.0, " - + " \"delta\" : 39.0, " - + " \"variation\" : 39.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 1, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : \"0\", " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"112_U.S._580\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 2, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : \"0\", " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._243\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 77.0, " - + " \"delta\" : 77.0, " - + " \"variation\" : 77.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 3, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : \"0\", " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._73\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 4, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : \"0\", " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._756\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 68.0, " - + " \"delta\" : 68.0, " - + " \"variation\" : 68.0, " - + " \"deleted\" : 0.0 " - + " } " - + " } ] }} ]"; - - // Select query results as records - private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), - new Text("article"), new Text("0"), new Text("0"), - new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), - new Text("EmausBot"), - new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), - new FloatWritable(39.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), - new FloatWritable(70.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), - new FloatWritable(77.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), - new FloatWritable(70.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), - new FloatWritable(68.0F), new FloatWritable(0.0F) } - }; - - // Select query results as records (types defined by metastore) - private static final String SELECT_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted"; - private static final String SELECT_COLUMN_TYPES = "timestamp,string,string,string,string,string,string,string,string,double,double,float,float,float"; - private static final Object[][] SELECT_QUERY_RESULTS_RECORDS_2 = new Object[][] { - new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), - new Text("article"), new Text("0"), new Text("0"), - new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), - new Text("EmausBot"), - new DoubleWritable(1.0d), new DoubleWritable(39.0d), new FloatWritable(39.0F), - new FloatWritable(39.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F), - new FloatWritable(70.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new DoubleWritable(1.0d), new DoubleWritable(77.0d), new FloatWritable(77.0F), - new FloatWritable(77.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F), - new FloatWritable(70.0F), new FloatWritable(0.0F) }, - new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), - new Text("article"), new Text("0"), new Text("0"), - new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"), - new DoubleWritable(1.0d), new DoubleWritable(68.0d), new FloatWritable(68.0F), - new FloatWritable(68.0F), new FloatWritable(0.0F) } - }; - - /** - * Test the default behavior of the objects and object inspectors. - * @throws IOException - * @throws IllegalAccessException - * @throws IllegalArgumentException - * @throws SecurityException - * @throws NoSuchFieldException - * @throws JsonMappingException - * @throws JsonParseException - * @throws InvocationTargetException - * @throws NoSuchMethodException - */ - @Test - public void testDruidDeserializer() - throws SerDeException, JsonParseException, JsonMappingException, - NoSuchFieldException, SecurityException, IllegalArgumentException, - IllegalAccessException, IOException, InterruptedException, - NoSuchMethodException, InvocationTargetException { - // Create, initialize, and test the SerDe - QTestDruidSerDe serDe = new QTestDruidSerDe(); - Configuration conf = new Configuration(); - Properties tbl; - // Timeseries query - tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY, - TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS - ); - // Timeseries query (simulating column types from metastore) - tbl.setProperty(serdeConstants.LIST_COLUMNS, TIMESERIES_COLUMN_NAMES); - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TIMESERIES_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY, - TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS_2 - ); - // TopN query - tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY, - TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS - ); - // TopN query (simulating column types from metastore) - tbl.setProperty(serdeConstants.LIST_COLUMNS, TOPN_COLUMN_NAMES); - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TOPN_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY, - TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS_2 - ); - // GroupBy query - tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY, - GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS - ); - // GroupBy query (simulating column types from metastore) - tbl.setProperty(serdeConstants.LIST_COLUMNS, GROUP_BY_COLUMN_NAMES); - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, GROUP_BY_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY, - GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS_2 - ); - // Select query - tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, - SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS - ); - // Select query (simulating column types from metastore) - tbl.setProperty(serdeConstants.LIST_COLUMNS, SELECT_COLUMN_NAMES); - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, SELECT_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, - SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS_2 - ); - } - - private static Properties createPropertiesQuery(String dataSource, String queryType, - String jsonQuery - ) { - Properties tbl = new Properties(); - - // Set the configuration parameters - tbl.setProperty(Constants.DRUID_DATA_SOURCE, dataSource); - tbl.setProperty(Constants.DRUID_QUERY_JSON, jsonQuery); - tbl.setProperty(Constants.DRUID_QUERY_TYPE, queryType); - return tbl; - } - - @SuppressWarnings("unchecked") - private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery, - String resultString, Object[][] records - ) throws SerDeException, JsonParseException, - JsonMappingException, IOException, NoSuchFieldException, SecurityException, - IllegalArgumentException, IllegalAccessException, InterruptedException, - NoSuchMethodException, InvocationTargetException { - - // Initialize - Query query = null; - DruidQueryRecordReader reader = null; - List resultsList = null; - ObjectMapper mapper = DruidStorageHandlerUtils.JSON_MAPPER; - switch (queryType) { - case Query.TIMESERIES: - query = mapper.readValue(jsonQuery, TimeseriesQuery.class); - reader = new DruidTimeseriesQueryRecordReader(); - resultsList = mapper.readValue(resultString, - new TypeReference>>() { - } - ); - break; - case Query.TOPN: - query = mapper.readValue(jsonQuery, TopNQuery.class); - reader = new DruidTopNQueryRecordReader(); - resultsList = mapper.readValue(resultString, - new TypeReference>>() { - } - ); - break; - case Query.GROUP_BY: - query = mapper.readValue(jsonQuery, GroupByQuery.class); - reader = new DruidGroupByQueryRecordReader(); - resultsList = mapper.readValue(resultString, - new TypeReference>() { - } - ); - break; - case Query.SELECT: - query = mapper.readValue(jsonQuery, SelectQuery.class); - reader = new DruidSelectQueryRecordReader(); - resultsList = mapper.readValue(resultString, - new TypeReference>>() { - } - ); - break; - } - - // Set query and fields access - Field field1 = DruidQueryRecordReader.class.getDeclaredField("query"); - field1.setAccessible(true); - field1.set(reader, query); - if (reader instanceof DruidGroupByQueryRecordReader) { - Method method1 = DruidGroupByQueryRecordReader.class.getDeclaredMethod("initDimensionTypes"); - method1.setAccessible(true); - method1.invoke(reader); - Method method2 = DruidGroupByQueryRecordReader.class.getDeclaredMethod("initExtractors"); - method2.setAccessible(true); - method2.invoke(reader); - } - Field field2 = DruidQueryRecordReader.class.getDeclaredField("results"); - field2.setAccessible(true); - - // Get the row structure - StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); - List fieldRefs = oi.getAllStructFieldRefs(); - - // Check mapred - Iterator results = resultsList.iterator(); - field2.set(reader, results); - DruidWritable writable = new DruidWritable(); - int pos = 0; - while (reader.next(NullWritable.get(), writable)) { - List row = (List) serDe.deserialize(writable); - Object[] expectedFieldsData = records[pos]; - assertEquals(expectedFieldsData.length, fieldRefs.size()); - for (int i = 0; i < fieldRefs.size(); i++) { - assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass()); - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); - assertEquals("Field " + i, expectedFieldsData[i], fieldData); - } - pos++; - } - assertEquals(pos, records.length); - - // Check mapreduce - results = resultsList.iterator(); - field2.set(reader, results); - pos = 0; - while (reader.nextKeyValue()) { - List row = (List) serDe.deserialize(reader.getCurrentValue()); - Object[] expectedFieldsData = records[pos]; - assertEquals(expectedFieldsData.length, fieldRefs.size()); - for (int i = 0; i < fieldRefs.size(); i++) { - assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass()); - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); - assertEquals("Field " + i, expectedFieldsData[i], fieldData); - } - pos++; - } - assertEquals(pos, records.length); - } - - - private static final String COLUMN_NAMES = "__time,c0,c1,c2,c3,c4,c5,c6,c7,c8,c9"; - private static final String COLUMN_TYPES = "timestamp,string,char(6),varchar(8),double,float,decimal(38,18),bigint,int,smallint,tinyint"; - private static final Object[] ROW_OBJECT = new Object[] { - new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val"), - new HiveCharWritable(new HiveChar("dim2_v", 6)), - new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)), - new DoubleWritable(10669.3D), - new FloatWritable(10669.45F), - new HiveDecimalWritable(HiveDecimal.create(1064.34D)), - new LongWritable(1113939), - new IntWritable(1112123), - new ShortWritable((short) 12), - new ByteWritable((byte) 0), - new TimestampWritable(new Timestamp(1377907200000L)) // granularity - }; - private static final DruidWritable DRUID_WRITABLE = new DruidWritable( - ImmutableMap.builder() - .put("__time", 1377907200000L) - .put("c0", "dim1_val") - .put("c1", "dim2_v") - .put("c2", "dim3_val") - .put("c3", 10669.3D) - .put("c4", 10669.45F) - .put("c5", 1064.34D) - .put("c6", 1113939L) - .put("c7", 1112123) - .put("c8", (short) 12) - .put("c9", (byte) 0) - .put("__time_granularity", 1377907200000L) - .build()); - - /** - * Test the default behavior of the objects and object inspectors. - * @throws IOException - * @throws IllegalAccessException - * @throws IllegalArgumentException - * @throws SecurityException - * @throws NoSuchFieldException - * @throws JsonMappingException - * @throws JsonParseException - * @throws InvocationTargetException - * @throws NoSuchMethodException - */ - @Test - public void testDruidObjectSerializer() - throws SerDeException, JsonParseException, JsonMappingException, - NoSuchFieldException, SecurityException, IllegalArgumentException, - IllegalAccessException, IOException, InterruptedException, - NoSuchMethodException, InvocationTargetException { - // Create, initialize, and test the SerDe - DruidSerDe serDe = new DruidSerDe(); - Configuration conf = new Configuration(); - Properties tbl; - // Mixed source (all types) - tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - serializeObject(tbl, serDe, ROW_OBJECT, DRUID_WRITABLE); - } - - private static Properties createPropertiesSource(String columnNames, String columnTypes) { - Properties tbl = new Properties(); - - // Set the configuration parameters - tbl.setProperty(serdeConstants.LIST_COLUMNS, columnNames); - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnTypes); - return tbl; - } - - private static void serializeObject(Properties properties, DruidSerDe serDe, - Object[] rowObject, DruidWritable druidWritable) throws SerDeException { - // Build OI with timestamp granularity column - final List columnNames = new ArrayList<>(); - final List columnTypes = new ArrayList<>(); - List inspectors = new ArrayList<>(); - columnNames.addAll(Utilities.getColumnNames(properties)); - columnNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); - columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), - new Function() { - @Override - public PrimitiveTypeInfo apply(String type) { - return TypeInfoFactory.getPrimitiveTypeInfo(type); - } - } - )); - columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp")); - inspectors.addAll(Lists.transform(columnTypes, - new Function() { - @Override - public ObjectInspector apply(PrimitiveTypeInfo type) { - return PrimitiveObjectInspectorFactory - .getPrimitiveWritableObjectInspector(type); - } - } - )); - ObjectInspector inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, inspectors); - // Serialize - DruidWritable writable = (DruidWritable) serDe.serialize(rowObject, inspector); - // Check result - assertEquals(druidWritable.getValue().size(), writable.getValue().size()); - for (Entry e: druidWritable.getValue().entrySet()) { - assertEquals(e.getValue(), writable.getValue().get(e.getKey())); - } - } - - private static final Object[] ROW_OBJECT_2 = new Object[] { - new TimestampWritable(new Timestamp(1377907200000L)), - new Text("dim1_val"), - new HiveCharWritable(new HiveChar("dim2_v", 6)), - new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)), - new DoubleWritable(10669.3D), - new FloatWritable(10669.45F), - new HiveDecimalWritable(HiveDecimal.create(1064.34D)), - new LongWritable(1113939), - new IntWritable(1112123), - new ShortWritable((short) 12), - new ByteWritable((byte) 0) - }; - private static final DruidWritable DRUID_WRITABLE_2 = new DruidWritable( - ImmutableMap.builder() - .put("__time", 1377907200000L) - .put("c0", "dim1_val") - .put("c1", "dim2_v") - .put("c2", "dim3_val") - .put("c3", 10669.3D) - .put("c4", 10669.45F) - .put("c5", 1064.34D) - .put("c6", 1113939L) - .put("c7", 1112123) - .put("c8", (short) 12) - .put("c9", (byte) 0) - .build()); - - @Test - public void testDruidObjectDeserializer() - throws SerDeException, JsonParseException, JsonMappingException, - NoSuchFieldException, SecurityException, IllegalArgumentException, - IllegalAccessException, IOException, InterruptedException, - NoSuchMethodException, InvocationTargetException { - // Create, initialize, and test the SerDe - DruidSerDe serDe = new DruidSerDe(); - Configuration conf = new Configuration(); - Properties tbl; - // Mixed source (all types) - tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeObject(tbl, serDe, ROW_OBJECT_2, DRUID_WRITABLE_2); - } - - @SuppressWarnings("unchecked") - private static void deserializeObject(Properties properties, DruidSerDe serDe, - Object[] rowObject, DruidWritable druidWritable) throws SerDeException { - // Deserialize - List object = (List) serDe.deserialize(druidWritable); - // Check result - assertEquals(rowObject.length, object.size()); - for (int i = 0; i < rowObject.length; i++) { - assertEquals(rowObject[i].getClass(), object.get(i).getClass()); - assertEquals(rowObject[i], object.get(i)); - } - } -} diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java new file mode 100644 index 0000000..db07b2d --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -0,0 +1,991 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.util.concurrent.SettableFuture; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.data.input.Row; +import io.druid.query.Result; +import io.druid.query.select.SelectResultValue; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.query.topn.TopNResultValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.QTestDruidSerDe; +import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.HiveDruidSplit; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; + +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import io.druid.query.Query; + +/** + * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1 + * documentation. + */ +public class TestDruidSerDe { + // Timeseries query + private static final String TIMESERIES_QUERY = + "{ \"queryType\": \"timeseries\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"descending\": \"true\", " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, " + + " { \"type\": \"or\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } " + + " ] " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, " + + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } " + + " ], " + + " \"postAggregations\": [ " + + " { \"type\": \"arithmetic\", " + + " \"name\": \"sample_divide\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, " + + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}"; + + // Timeseries query results + private static final String TIMESERIES_QUERY_RESULTS = + "[ " + + "{ " + + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", " + + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } " + + "}, " + + "{ " + + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", " + + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } " + + "}]"; + + private byte[] tsQueryResults; + private byte[] topNQueryResults; + private byte[] groupByQueryResults; + private byte[] groupByTimeExtractQueryResults; + private byte[] selectQueryResults; + private byte[] groupByMonthExtractQueryResults; + + + // Timeseries query results as records + private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), + new DoubleWritable(1.0), new FloatWritable(2.2222F) }, + new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), + new DoubleWritable(3.32), new FloatWritable(4F) } + }; + + // Timeseries query results as records (types defined by metastore) + private static final String TIMESERIES_COLUMN_NAMES = "__time,sample_name1,sample_name2,sample_divide"; + private static final String TIMESERIES_COLUMN_TYPES = "timestamp,smallint,double,float"; + private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS_2 = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new ShortWritable((short) 0), + new DoubleWritable(1.0d), new FloatWritable(2.2222F) }, + new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new ShortWritable((short) 2), + new DoubleWritable(3.32d), new FloatWritable(4F) } + }; + + // TopN query + private static final String TOPN_QUERY = + "{ \"queryType\": \"topN\", " + + " \"dataSource\": \"sample_data\", " + + " \"dimension\": \"sample_dim\", " + + " \"threshold\": 5, " + + " \"metric\": \"count\", " + + " \"granularity\": \"all\", " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { " + + " \"type\": \"selector\", " + + " \"dimension\": \"dim1\", " + + " \"value\": \"some_value\" " + + " }, " + + " { " + + " \"type\": \"selector\", " + + " \"dimension\": \"dim2\", " + + " \"value\": \"some_other_val\" " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { " + + " \"type\": \"longSum\", " + + " \"name\": \"count\", " + + " \"fieldName\": \"count\" " + + " }, " + + " { " + + " \"type\": \"doubleSum\", " + + " \"name\": \"some_metric\", " + + " \"fieldName\": \"some_metric\" " + + " } " + + " ], " + + " \"postAggregations\": [ " + + " { " + + " \"type\": \"arithmetic\", " + + " \"name\": \"sample_divide\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { " + + " \"type\": \"fieldAccess\", " + + " \"name\": \"some_metric\", " + + " \"fieldName\": \"some_metric\" " + + " }, " + + " { " + + " \"type\": \"fieldAccess\", " + + " \"name\": \"count\", " + + " \"fieldName\": \"count\" " + + " } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ " + + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" " + + " ]}"; + + // TopN query results + private static final String TOPN_QUERY_RESULTS = + "[ " + + " { " + + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", " + + " \"result\": [ " + + " { " + + " \"sample_dim\": \"dim1_val\", " + + " \"count\": 111, " + + " \"some_metric\": 10669, " + + " \"sample_divide\": 96.11711711711712 " + + " }, " + + " { " + + " \"sample_dim\": \"another_dim1_val\", " + + " \"count\": 88, " + + " \"some_metric\": 28344, " + + " \"sample_divide\": 322.09090909090907 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val3\", " + + " \"count\": 70, " + + " \"some_metric\": 871, " + + " \"sample_divide\": 12.442857142857143 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val4\", " + + " \"count\": 62, " + + " \"some_metric\": 815, " + + " \"sample_divide\": 13.14516129032258 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val5\", " + + " \"count\": 60, " + + " \"some_metric\": 2787, " + + " \"sample_divide\": 46.45 " + + " } " + + " ] " + + " }]"; + + // TopN query results as records + private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), + new LongWritable(111), new DoubleWritable(10669.0), + new FloatWritable(96.11711711711712F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("another_dim1_val"), new LongWritable(88), new DoubleWritable(28344.0), + new FloatWritable(322.09090909090907F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val3"), new LongWritable(70), new DoubleWritable(871D), + new FloatWritable(12.442857142857143F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val4"), new LongWritable(62), new DoubleWritable(815.0), + new FloatWritable(13.14516129032258F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val5"), new LongWritable(60), new DoubleWritable(2787.0), + new FloatWritable(46.45F) } + }; + + // TopN query results as records (types defined by metastore) + private static final String TOPN_COLUMN_NAMES = "__time,sample_dim,count,some_metric,sample_divide"; + private static final String TOPN_COLUMN_TYPES = "timestamp,string,bigint,double,float"; + private static final Object[][] TOPN_QUERY_RESULTS_RECORDS_2 = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val"), new LongWritable(111), new DoubleWritable(10669d), + new FloatWritable(96.11711711711712F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("another_dim1_val"), new LongWritable(88), new DoubleWritable(28344d), + new FloatWritable(322.09090909090907F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val3"), new LongWritable(70), new DoubleWritable(871d), + new FloatWritable(12.442857142857143F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val4"), new LongWritable(62), new DoubleWritable(815d), + new FloatWritable(13.14516129032258F) }, + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val5"), new LongWritable(60), new DoubleWritable(2787d), + new FloatWritable(46.45F) } + }; + + // GroupBy query + private static final String GROUP_BY_QUERY = + "{ " + + " \"queryType\": \"groupBy\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"dimensions\": [\"country\", \"device\"], " + + " \"limitSpec\": {" + + " \"type\": \"default\"," + + " \"limit\": 5000," + + " \"columns\": [\"country\", \"data_transfer\"] }, " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, " + + " { \"type\": \"or\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, " + + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } " + + " ] " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, " + + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } " + + " ], " + + " \"postAggregations\": [ " + + " { \"type\": \"arithmetic\", " + + " \"name\": \"avg_usage\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, " + + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], " + + " \"having\": { " + + " \"type\": \"greaterThan\", " + + " \"aggregation\": \"total_usage\", " + + " \"value\": 100 " + + " }}"; + + // GroupBy query results + private static final String GROUP_BY_QUERY_RESULTS = + "[ " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", " + + " \"event\" : { " + + " \"country\" : \"India\", " + + " \"device\" : \"phone\", " + + " \"total_usage\" : 88, " + + " \"data_transfer\" : 29.91233453, " + + " \"avg_usage\" : 60.32 " + + " } " + + " }, " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", " + + " \"event\" : { " + + " \"country\" : \"Spain\", " + + " \"device\" : \"pc\", " + + " \"total_usage\" : 16, " + + " \"data_transfer\" : 172.93494959, " + + " \"avg_usage\" : 6.333333 " + + " } " + + " }]"; + + private static final String GB_TIME_EXTRACTIONS = "{\"queryType\":\"groupBy\",\"dataSource\":\"sample_datasource\"," + + "\"granularity\":\"all\",\"dimensions\":" + + "[{\"type\":\"extraction\",\"dimension\":\"__time\",\"outputName\":\"extract\",\"extractionFn\":" + + "{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'\",\"timeZone\":\"UTC\"}}]," + + "\"limitSpec\":{\"type\":\"default\"}," + + "\"aggregations\":[{\"type\":\"count\",\"name\":\"$f1\"}]," + + "\"intervals\":[\"1900-01-01T00:00:00.000/3000-01-01T00:00:00.000\"]}"; + + private static final String GB_TIME_EXTRACTIONS_RESULTS = "[ " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", " + + " \"event\" : { " + + " \"extract\" : \"2012-01-01T00:00:00.000Z\", " + + " \"$f1\" : 200" + + " } " + + " }, " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", " + + " \"event\" : { " + + " \"extract\" : \"2012-01-01T00:00:12.000Z\", " + + " \"$f1\" : 400" + + " } " + + " }]"; + + private static final String GB_MONTH_EXTRACTIONS_RESULTS = "[ " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", " + + " \"event\" : { " + + " \"extract_month\" : \"01\", " + + " \"$f1\" : 200" + + " } " + + " }, " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", " + + " \"event\" : { " + + " \"extract_month\" : \"01\", " + + " \"$f1\" : 400" + + " } " + + " }]"; + + + private final static String GB_MONTH_EXTRACTIONS = "{\"queryType\":\"groupBy\",\"dataSource\":\"sample_datasource\"," + + "\"granularity\":\"all\"," + + "\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"__time\",\"outputName\":\"extract_month\"," + + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"M\",\"timeZone\":\"UTC\",\"locale\":\"en-US\"}}]," + + "\"limitSpec\":{\"type\":\"default\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"$f1\"}]," + + "\"intervals\":[\"1900-01-01T00:00:00.000/3000-01-01T00:00:00.000\"]}"; + + // GroupBy query results as records + private static final Object[][] GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), + new TimestampWritable(new Timestamp(1325376000000L)), + new LongWritable(200) }, + new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new TimestampWritable(new Timestamp(1325376012000L)), + new LongWritable(400) } + }; + + private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), + new Text("phone"), new LongWritable(88), new DoubleWritable(29.91233453), + new FloatWritable(60.32F) }, + new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), + new Text("pc"), new LongWritable(16), new DoubleWritable(172.93494959), + new FloatWritable(6.333333F) } + }; + + private static final Object[][] GB_MONTH_EXTRACTION_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), + new IntWritable(1), + new LongWritable(200) }, + new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new IntWritable(1), + new LongWritable(400) } + }; + + // GroupBy query results as records (types defined by metastore) + private static final String GROUP_BY_COLUMN_NAMES = "__time,country,device,total_usage,data_transfer,avg_usage"; + private static final String GROUP_BY_COLUMN_TYPES = "timestamp,string,string,int,double,float"; + private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS_2 = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), + new Text("phone"), new IntWritable(88), new DoubleWritable(29.91233453), + new FloatWritable(60.32F) }, + new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), + new Text("pc"), new IntWritable(16), new DoubleWritable(172.93494959), + new FloatWritable(6.333333F) } + }; + + // Select query + private static final String SELECT_QUERY = + "{ \"queryType\": \"select\", " + + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " + + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], " + + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " + + " \"granularity\": \"all\", " + + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " + + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }"; + + // Select query results + private static final String SELECT_QUERY_RESULTS = + "[{ " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"result\" : { " + + " \"pagingIdentifiers\" : { " + + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, " + + " \"events\" : [ { " + + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 0, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"1\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"11._korpus_(NOVJ)\", " + + " \"language\" : \"sl\", " + + " \"newpage\" : \"0\", " + + " \"user\" : \"EmausBot\", " + + " \"count\" : 1.0, " + + " \"added\" : 39.0, " + + " \"delta\" : 39.0, " + + " \"variation\" : 39.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 1, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"112_U.S._580\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 70.0, " + + " \"delta\" : 70.0, " + + " \"variation\" : 70.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 2, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._243\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 77.0, " + + " \"delta\" : 77.0, " + + " \"variation\" : 77.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 3, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._73\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 70.0, " + + " \"delta\" : 70.0, " + + " \"variation\" : 70.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 4, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._756\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 68.0, " + + " \"delta\" : 68.0, " + + " \"variation\" : 68.0, " + + " \"deleted\" : 0.0 " + + " } " + + " } ] }} ]"; + + // Select query results as records + private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), + new Text("article"), new Text("0"), new Text("0"), + new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), + new Text("EmausBot"), + new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), + new FloatWritable(39.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), + new FloatWritable(70.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), + new FloatWritable(77.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), + new FloatWritable(70.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), + new FloatWritable(68.0F), new FloatWritable(0.0F) } + }; + + // Select query results as records (types defined by metastore) + private static final String SELECT_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted"; + private static final String SELECT_COLUMN_TYPES = "timestamp,string,string,string,string,string,string,string,string,double,double,float,float,float"; + private static final Object[][] SELECT_QUERY_RESULTS_RECORDS_2 = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), + new Text("article"), new Text("0"), new Text("0"), + new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), + new Text("EmausBot"), + new DoubleWritable(1.0d), new DoubleWritable(39.0d), new FloatWritable(39.0F), + new FloatWritable(39.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F), + new FloatWritable(70.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new DoubleWritable(1.0d), new DoubleWritable(77.0d), new FloatWritable(77.0F), + new FloatWritable(77.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new DoubleWritable(1.0d), new DoubleWritable(70.0d), new FloatWritable(70.0F), + new FloatWritable(70.0F), new FloatWritable(0.0F) }, + new Object[] { new TimestampWritable(new Timestamp(1356998412000L)), new Text("0"), + new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new DoubleWritable(1.0d), new DoubleWritable(68.0d), new FloatWritable(68.0F), + new FloatWritable(68.0F), new FloatWritable(0.0F) } + }; + + @Before + public void setup() throws IOException { + tsQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(TIMESERIES_QUERY_RESULTS, new TypeReference>>() { + })); + + topNQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(TOPN_QUERY_RESULTS, new TypeReference>>() { + })); + groupByQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(GROUP_BY_QUERY_RESULTS, + new TypeReference>() { + })); + groupByTimeExtractQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(GB_TIME_EXTRACTIONS_RESULTS, new TypeReference>() { + })); + groupByMonthExtractQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(GB_MONTH_EXTRACTIONS_RESULTS, new TypeReference>() { + })); + selectQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(SELECT_QUERY_RESULTS, new TypeReference>>() { + })); + } + + /** + * Test the default behavior of the objects and object inspectors. + * @throws IOException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws JsonMappingException + * @throws JsonParseException + * @throws InvocationTargetException + * @throws NoSuchMethodException + */ + @Test + public void testDruidDeserializer() throws SerDeException, NoSuchFieldException, + SecurityException, IllegalArgumentException, IllegalAccessException, + IOException, InterruptedException, NoSuchMethodException, InvocationTargetException { + // Create, initialize, and test the SerDe + QTestDruidSerDe serDe = new QTestDruidSerDe(); + Configuration conf = new Configuration(); + Properties tbl; + // Timeseries query + tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY, + tsQueryResults, TIMESERIES_QUERY_RESULTS_RECORDS + ); + // Timeseries query (simulating column types from metastore) + tbl.setProperty(serdeConstants.LIST_COLUMNS, TIMESERIES_COLUMN_NAMES); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TIMESERIES_COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY, + tsQueryResults, TIMESERIES_QUERY_RESULTS_RECORDS_2 + ); + // TopN query + tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY, + topNQueryResults, TOPN_QUERY_RESULTS_RECORDS + ); + // TopN query (simulating column types from metastore) + tbl.setProperty(serdeConstants.LIST_COLUMNS, TOPN_COLUMN_NAMES); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, TOPN_COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY, + topNQueryResults, TOPN_QUERY_RESULTS_RECORDS_2 + ); + // GroupBy query + tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY, + groupByQueryResults, GROUP_BY_QUERY_RESULTS_RECORDS + ); + // GroupBy query (simulating column types from metastore) + tbl.setProperty(serdeConstants.LIST_COLUMNS, GROUP_BY_COLUMN_NAMES); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, GROUP_BY_COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY, + groupByQueryResults, GROUP_BY_QUERY_RESULTS_RECORDS_2 + ); + tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GB_TIME_EXTRACTIONS); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.GROUP_BY, GB_TIME_EXTRACTIONS, + groupByTimeExtractQueryResults, GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS + ); + + tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GB_MONTH_EXTRACTIONS); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.GROUP_BY, GB_MONTH_EXTRACTIONS, + groupByMonthExtractQueryResults, GB_MONTH_EXTRACTION_RESULTS_RECORDS + ); + // Select query + tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, + selectQueryResults, SELECT_QUERY_RESULTS_RECORDS + ); + // Select query (simulating column types from metastore) + tbl.setProperty(serdeConstants.LIST_COLUMNS, SELECT_COLUMN_NAMES); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, SELECT_COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, + selectQueryResults, SELECT_QUERY_RESULTS_RECORDS_2 + ); + } + + private static Properties createPropertiesQuery(String dataSource, String queryType, + String jsonQuery + ) { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(Constants.DRUID_DATA_SOURCE, dataSource); + tbl.setProperty(Constants.DRUID_QUERY_JSON, jsonQuery); + tbl.setProperty(Constants.DRUID_QUERY_TYPE, queryType); + return tbl; + } + + @SuppressWarnings("unchecked") + private void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery, + byte[] resultString, Object[][] records + ) throws SerDeException, IOException, NoSuchFieldException, SecurityException, + IllegalArgumentException, IllegalAccessException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + + // Initialize + HttpClient httpClient = mock(HttpClient.class); + SettableFuture futureResult = SettableFuture.create(); + futureResult.set(new ByteArrayInputStream(resultString)); + when(httpClient.go(anyObject(), any(HttpResponseHandler.class))).thenReturn(futureResult); + DruidQueryRecordReader reader = DruidQueryBasedInputFormat.getDruidQueryReader(queryType); + + final HiveDruidSplit split = new HiveDruidSplit(jsonQuery, + new Path("empty"), + new String[] { "testing_host" } + ); + + reader.initialize(split, new Configuration(), DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.SMILE_MAPPER, httpClient + ); + StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); + List fieldRefs = oi.getAllStructFieldRefs(); + + // Check mapred + DruidWritable writable = new DruidWritable(); + int pos = 0; + while (reader.next(NullWritable.get(), writable)) { + List row = (List) serDe.deserialize(writable); + Object[] expectedFieldsData = records[pos]; + assertEquals(expectedFieldsData.length, fieldRefs.size()); + for (int i = 0; i < fieldRefs.size(); i++) { + assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), + row.get(i).getClass() + ); + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + pos++; + } + assertEquals(pos, records.length); + + + // Check mapreduce path + futureResult = SettableFuture.create(); + futureResult.set(new ByteArrayInputStream(resultString)); + when(httpClient.go(anyObject(), any(HttpResponseHandler.class))).thenReturn(futureResult); + reader = DruidQueryBasedInputFormat.getDruidQueryReader(queryType); + reader.initialize(split, new Configuration(), DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.SMILE_MAPPER, httpClient + ); + + pos = 0; + while (reader.nextKeyValue()) { + List row = (List) serDe.deserialize(reader.getCurrentValue()); + Object[] expectedFieldsData = records[pos]; + assertEquals(expectedFieldsData.length, fieldRefs.size()); + for (int i = 0; i < fieldRefs.size(); i++) { + assertEquals("Field " + i + " type", expectedFieldsData[i].getClass(), row.get(i).getClass()); + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + pos++; + } + assertEquals(pos, records.length); + } + + + private static final String COLUMN_NAMES = "__time,c0,c1,c2,c3,c4,c5,c6,c7,c8,c9"; + private static final String COLUMN_TYPES = "timestamp,string,char(6),varchar(8),double,float,decimal(38,18),bigint,int,smallint,tinyint"; + private static final Object[] ROW_OBJECT = new Object[] { + new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val"), + new HiveCharWritable(new HiveChar("dim2_v", 6)), + new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)), + new DoubleWritable(10669.3D), + new FloatWritable(10669.45F), + new HiveDecimalWritable(HiveDecimal.create(1064.34D)), + new LongWritable(1113939), + new IntWritable(1112123), + new ShortWritable((short) 12), + new ByteWritable((byte) 0), + new TimestampWritable(new Timestamp(1377907200000L)) // granularity + }; + private static final DruidWritable DRUID_WRITABLE = new DruidWritable( + ImmutableMap.builder() + .put("__time", 1377907200000L) + .put("c0", "dim1_val") + .put("c1", "dim2_v") + .put("c2", "dim3_val") + .put("c3", 10669.3D) + .put("c4", 10669.45F) + .put("c5", 1064.34D) + .put("c6", 1113939L) + .put("c7", 1112123) + .put("c8", (short) 12) + .put("c9", (byte) 0) + .put("__time_granularity", 1377907200000L) + .build()); + + /** + * Test the default behavior of the objects and object inspectors. + * @throws IOException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws JsonMappingException + * @throws JsonParseException + * @throws InvocationTargetException + * @throws NoSuchMethodException + */ + @Test + public void testDruidObjectSerializer() + throws SerDeException, JsonParseException, JsonMappingException, + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException, IOException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + // Create, initialize, and test the SerDe + DruidSerDe serDe = new DruidSerDe(); + Configuration conf = new Configuration(); + Properties tbl; + // Mixed source (all types) + tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + serializeObject(tbl, serDe, ROW_OBJECT, DRUID_WRITABLE); + } + + private static Properties createPropertiesSource(String columnNames, String columnTypes) { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(serdeConstants.LIST_COLUMNS, columnNames); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnTypes); + return tbl; + } + + private static void serializeObject(Properties properties, DruidSerDe serDe, + Object[] rowObject, DruidWritable druidWritable) throws SerDeException { + // Build OI with timestamp granularity column + final List columnNames = new ArrayList<>(); + final List columnTypes = new ArrayList<>(); + List inspectors = new ArrayList<>(); + columnNames.addAll(Utilities.getColumnNames(properties)); + columnNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); + columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), + new Function() { + @Override + public PrimitiveTypeInfo apply(String type) { + return TypeInfoFactory.getPrimitiveTypeInfo(type); + } + } + )); + columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp")); + inspectors.addAll(Lists.transform(columnTypes, + new Function() { + @Override + public ObjectInspector apply(PrimitiveTypeInfo type) { + return PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(type); + } + } + )); + ObjectInspector inspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columnNames, inspectors); + // Serialize + DruidWritable writable = (DruidWritable) serDe.serialize(rowObject, inspector); + // Check result + assertEquals(druidWritable.getValue().size(), writable.getValue().size()); + for (Entry e: druidWritable.getValue().entrySet()) { + assertEquals(e.getValue(), writable.getValue().get(e.getKey())); + } + } + + private static final Object[] ROW_OBJECT_2 = new Object[] { + new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val"), + new HiveCharWritable(new HiveChar("dim2_v", 6)), + new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)), + new DoubleWritable(10669.3D), + new FloatWritable(10669.45F), + new HiveDecimalWritable(HiveDecimal.create(1064.34D)), + new LongWritable(1113939), + new IntWritable(1112123), + new ShortWritable((short) 12), + new ByteWritable((byte) 0) + }; + private static final DruidWritable DRUID_WRITABLE_2 = new DruidWritable( + ImmutableMap.builder() + .put("__time", 1377907200000L) + .put("c0", "dim1_val") + .put("c1", "dim2_v") + .put("c2", "dim3_val") + .put("c3", 10669.3D) + .put("c4", 10669.45F) + .put("c5", 1064.34D) + .put("c6", 1113939L) + .put("c7", 1112123) + .put("c8", (short) 12) + .put("c9", (byte) 0) + .build()); + + @Test + public void testDruidObjectDeserializer() + throws SerDeException, JsonParseException, JsonMappingException, + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException, IOException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + // Create, initialize, and test the SerDe + DruidSerDe serDe = new DruidSerDe(); + Configuration conf = new Configuration(); + Properties tbl; + // Mixed source (all types) + tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeObject(tbl, serDe, ROW_OBJECT_2, DRUID_WRITABLE_2); + } + + @SuppressWarnings("unchecked") + private static void deserializeObject(Properties properties, DruidSerDe serDe, + Object[] rowObject, DruidWritable druidWritable) throws SerDeException { + // Deserialize + List object = (List) serDe.deserialize(druidWritable); + // Check result + assertEquals(rowObject.length, object.size()); + for (int i = 0; i < rowObject.length; i++) { + assertEquals(rowObject[i].getClass(), object.get(i).getClass()); + assertEquals(rowObject[i], object.get(i)); + } + } +}