diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index d1b2a72..8b37840 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -17,24 +17,14 @@ */ package org.apache.hadoop.hive.druid.io; -import com.fasterxml.jackson.core.type.TypeReference; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import io.druid.query.Druids; -import io.druid.query.Druids.SegmentMetadataQueryBuilder; -import io.druid.query.Druids.SelectQueryBuilder; -import io.druid.query.Druids.TimeBoundaryQueryBuilder; -import io.druid.query.Query; -import io.druid.query.Result; -import io.druid.query.metadata.metadata.SegmentAnalysis; -import io.druid.query.metadata.metadata.SegmentMetadataQuery; -import io.druid.query.select.PagingSpec; -import io.druid.query.select.SelectQuery; -import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.timeboundary.TimeBoundaryQuery; -import io.druid.query.timeboundary.TimeBoundaryResultValue; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.calcite.adapter.druid.DruidDateTimeUtils; import org.apache.calcite.adapter.druid.DruidTable; import org.apache.commons.lang3.StringEscapeUtils; @@ -67,13 +57,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.query.BaseQuery; +import io.druid.query.Druids; +import io.druid.query.Druids.SegmentMetadataQueryBuilder; +import io.druid.query.Druids.SelectQueryBuilder; +import io.druid.query.Druids.TimeBoundaryQueryBuilder; +import io.druid.query.Query; +import io.druid.query.Result; +import io.druid.query.metadata.metadata.SegmentAnalysis; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.timeboundary.TimeBoundaryQuery; +import io.druid.query.timeboundary.TimeBoundaryResultValue; /** * Druid query based input format. @@ -131,13 +136,19 @@ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); Path[] paths = FileInputFormat.getInputPaths(jobContext); + // We need to deserialize and serialize query so intervals are written in the JSON + // Druid query with user timezone, as this is default Hive time semantics. + // Then, create splits with the Druid queries. switch (druidQueryType) { case Query.TIMESERIES: case Query.TOPN: case Query.GROUP_BY: - return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) }; + return new HiveDruidSplit[] { new HiveDruidSplit(address, + deserializeSerialize(druidQuery), paths[0]) }; case Query.SELECT: - return splitSelectQuery(conf, address, druidQuery, paths[0]); + SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + druidQuery, SelectQuery.class); + return splitSelectQuery(conf, address, selectQuery, paths[0]); default: throw new IOException("Druid query type not recognized"); } @@ -158,7 +169,7 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio /* Method that splits Select query depending on the threshold so read can be * parallelized */ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, - String druidQuery, Path dummyPath + SelectQuery query, Path dummyPath ) throws IOException { final int selectThreshold = (int) HiveConf.getIntVar( conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); @@ -166,12 +177,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); final Period readTimeout = new Period( HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - SelectQuery query; - try { - query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); - } catch (Exception e) { - throw new IOException(e); - } final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); if (isFetch) { @@ -353,6 +358,13 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio return newIntervals; } + private static String deserializeSerialize(String druidQuery) + throws JsonParseException, JsonMappingException, IOException { + BaseQuery deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + druidQuery, BaseQuery.class); + return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(deserializedQuery); + } + @Override public org.apache.hadoop.mapred.RecordReader getRecordReader( org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index 4fde3eb..9b7a1da 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -22,11 +22,17 @@ import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.HiveDruidSplit; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; import org.junit.Test; +import io.druid.query.Query; import junit.framework.TestCase; public class TestHiveDruidQueryBasedInputFormat extends TestCase { @@ -130,4 +136,160 @@ public void testCreateSplitsIntervals() throws Exception { assertEquals(expectedResultList, resultList); } + private static final String TIMESERIES_QUERY = + "{ \"queryType\": \"timeseries\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"descending\": \"true\", " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}"; + private static final String TIMESERIES_QUERY_SPLIT = + "[HiveDruidSplit{localhost:8082, " + + "{\"queryType\":\"timeseries\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"descending\":true," + + "\"filter\":null," + + "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"}," + + "\"aggregations\":[]," + + "\"postAggregations\":[]," + + "\"context\":null}}]"; + + private static final String TOPN_QUERY = + "{ \"queryType\": \"topN\", " + + " \"dataSource\": \"sample_data\", " + + " \"dimension\": \"sample_dim\", " + + " \"threshold\": 5, " + + " \"metric\": \"count\", " + + " \"aggregations\": [ " + + " { " + + " \"type\": \"longSum\", " + + " \"name\": \"count\", " + + " \"fieldName\": \"count\" " + + " }, " + + " { " + + " \"type\": \"doubleSum\", " + + " \"name\": \"some_metric\", " + + " \"fieldName\": \"some_metric\" " + + " } " + + " ], " + + " \"granularity\": \"all\", " + + " \"intervals\": [ " + + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" " + + " ]}"; + private static final String TOPN_QUERY_SPLIT = + "[HiveDruidSplit{localhost:8082, " + + "{\"queryType\":\"topN\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_data\"}," + + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\"}," + + "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"}," + + "\"threshold\":5," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T00:00:00.000-07:00/2013-09-03T00:00:00.000-07:00\"]}," + + "\"filter\":null," + + "\"granularity\":{\"type\":\"all\"}," + + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\"}," + + "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\"}]," + + "\"postAggregations\":[]," + + "\"context\":null," + + "\"descending\":false}}]"; + + private static final String GROUP_BY_QUERY = + "{ \"queryType\": \"groupBy\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"dimensions\": [\"country\", \"device\"], " + + " \"limitSpec\": {" + + " \"type\": \"default\"," + + " \"limit\": 5000," + + " \"columns\": [\"country\", \"data_transfer\"] }, " + + " \"aggregations\": [ " + + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, " + + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } " + + " ], " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]" + + " }"; + private static final String GROUP_BY_QUERY_SPLIT = + "[HiveDruidSplit{localhost:8082, " + + "{\"queryType\":\"groupBy\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"filter\":null," + + "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"}," + + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"country\",\"outputName\":\"country\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"device\",\"outputName\":\"device\"}]," + + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"total_usage\",\"fieldName\":\"user_count\"}," + + "{\"type\":\"doubleSum\",\"name\":\"data_transfer\",\"fieldName\":\"data_transfer\"}]," + + "\"postAggregations\":[]," + + "\"having\":null," + + "\"limitSpec\":{\"type\":\"default\",\"columns\":[{\"dimension\":\"country\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}," + + "{\"dimension\":\"data_transfer\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}],\"limit\":5000}," + + "\"context\":null," + + "\"descending\":false}}]"; + + private static final String SELECT_QUERY = + "{ \"queryType\": \"select\", " + + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " + + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], " + + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " + + " \"granularity\": \"all\", " + + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " + + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, " + + " \"context\":{\"druid.query.fetch\":true}}"; + private static final String SELECT_QUERY_SPLIT = + "[HiveDruidSplit{localhost:8082, " + + "{\"queryType\":\"select\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"]}," + + "\"descending\":false," + + "\"filter\":null," + + "\"granularity\":{\"type\":\"all\"}," + + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\"}]," + + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"]," + + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false}," + + "\"context\":{\"druid.query.fetch\":true}}}]"; + + @Test + public void testTimeZone() throws Exception { + DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat(); + + Method method1 = DruidQueryBasedInputFormat.class.getDeclaredMethod( + "getInputSplits", Configuration.class); + method1.setAccessible(true); + + // Create, initialize, and test + Configuration conf = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY); + HiveDruidSplit[] resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); + assertEquals(TIMESERIES_QUERY_SPLIT, Arrays.toString(resultSplits)); + + conf = createPropertiesQuery("sample_datasource", Query.TOPN, TOPN_QUERY); + resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); + assertEquals(TOPN_QUERY_SPLIT, Arrays.toString(resultSplits)); + + conf = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY); + resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); + assertEquals(GROUP_BY_QUERY_SPLIT, Arrays.toString(resultSplits)); + + conf = createPropertiesQuery("sample_datasource", Query.SELECT, SELECT_QUERY); + resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); + assertEquals(SELECT_QUERY_SPLIT, Arrays.toString(resultSplits)); + } + + private static Configuration createPropertiesQuery(String dataSource, String queryType, + String jsonQuery) { + Configuration conf = new Configuration(); + // Set the configuration parameters + conf.set(FileInputFormat.INPUT_DIR, "/my/dir"); + conf.set(HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS.varname, "localhost:8082"); + conf.set(Constants.DRUID_DATA_SOURCE, dataSource); + conf.set(Constants.DRUID_QUERY_JSON, jsonQuery); + conf.set(Constants.DRUID_QUERY_TYPE, queryType); + return conf; + } + }