From 7c0e4cfcf3b0e85f9a9628e1bd02c28eb0f1f099 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 4 Oct 2018 00:30:12 +0530 Subject: [PATCH] [HIVE-20684] Make compute stats work for Druid tables --- .../apache/hadoop/hive/druid/DruidStorageHandlerUtils.java | 3 ++- .../hadoop/hive/druid/io/DruidQueryBasedInputFormat.java | 10 ++++++---- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 14 ++++++++++++-- .../org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 14 +++++++++----- .../hive/ql/udf/generic/GenericUDAFComputeStats.java | 1 + ql/src/test/queries/clientpositive/druidmini_test_insert.q | 2 ++ .../clientpositive/druid/druidmini_test_insert.q.out | 10 ++++++++++ .../primitive/PrimitiveObjectInspectorUtils.java | 4 ++++ 8 files changed, 46 insertions(+), 12 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index b9eb367f0f..e67de89429 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -672,13 +672,14 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, ); } - public static String createScanAllQuery(String dataSourceName) throws JsonProcessingException { + public static String createScanAllQuery(String dataSourceName, List columns) throws JsonProcessingException { final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder(); final List intervals = Arrays.asList(DEFAULT_INTERVAL); ScanQuery scanQuery = scanQueryBuilder .dataSource(dataSourceName) .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) .intervals(new MultipleIntervalSegmentSpec(intervals)) + .columns(columns) .build(); return JSON_MAPPER.writeValueAsString(scanQuery); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index f5009a2776..9266faeef4 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -114,6 +115,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) throw new IOException("Druid broker address not specified in configuration"); } String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON)); + String druidQueryType; if (StringUtils.isEmpty(druidQuery)) { // Empty, maybe because CBO did not run; we fall back to @@ -125,9 +127,9 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) if (dataSource == null || dataSource.isEmpty()) { throw new IOException("Druid data source cannot be empty or null"); } - - druidQuery = DruidStorageHandlerUtils.createScanAllQuery(dataSource); + druidQuery = DruidStorageHandlerUtils.createScanAllQuery(dataSource, Utilities.getColumnNames(conf)); druidQueryType = Query.SCAN; + conf.set(Constants.DRUID_QUERY_TYPE, druidQueryType); } else { druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE); if (druidQueryType == null) { @@ -286,7 +288,7 @@ private static String deserializeSerialize(String druidQuery) final DruidQueryRecordReader reader; final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE); if (druidQueryType == null) { - reader = new DruidSelectQueryRecordReader(); // By default + reader = new DruidScanQueryRecordReader(); // By default we use scan query as fallback. reader.initialize((HiveDruidSplit) split, job); return reader; } @@ -307,7 +309,7 @@ private static String deserializeSerialize(String druidQuery) // The reason is that Druid results format is different for each type. final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE); if (druidQueryType == null) { - return new DruidSelectQueryRecordReader(); // By default + return new DruidScanQueryRecordReader(); // By default, we use druid scan query as fallback. } final DruidQueryRecordReader reader = getDruidQueryReader(druidQueryType); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 74fb1ba7b5..aee73aafcd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2128,9 +2128,19 @@ public static String formatBinaryString(byte[] array, int start, int length) { public static List getColumnNames(Properties props) { List names = new ArrayList(); String colNames = props.getProperty(serdeConstants.LIST_COLUMNS); + return splitColNames(names, colNames); + } + + public static List getColumnNames(Configuration conf) { + List names = new ArrayList(); + String colNames = conf.get(serdeConstants.LIST_COLUMNS); + return splitColNames(names, colNames); + } + + private static List splitColNames(List names, String colNames) { String[] cols = colNames.trim().split(","); - for (String col : cols) { - if (StringUtils.isNotBlank(col)) { + for(String col : cols) { + if(StringUtils.isNotBlank(col)) { names.add(col); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e81ed50880..31bc38ea5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11126,11 +11126,15 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { Iterator vcs = VirtualColumn.getRegistry(conf).iterator(); // use a list for easy cumtomize List vcList = new ArrayList(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); - rwsch.put(alias, vc.getName().toLowerCase(), new ColumnInfo(vc.getName(), - vc.getTypeInfo(), alias, true, vc.getIsHidden())); - vcList.add(vc); + if(!tab.isNonNative()) { + // Virtual columns are only for native tables + while (vcs.hasNext()) { + VirtualColumn vc = vcs.next(); + rwsch.put(alias, vc.getName().toLowerCase(), new ColumnInfo(vc.getName(), + vc.getTypeInfo(), alias, true, vc.getIsHidden() + )); + vcList.add(vc); + } } // Create the root of the operator tree diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java index dd365dddcb..642f42b5b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java @@ -84,6 +84,7 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) case INT: case LONG: case TIMESTAMP: + case TIMESTAMPLOCALTZ: return new GenericUDAFLongStatsEvaluator(); case FLOAT: case DOUBLE: diff --git a/ql/src/test/queries/clientpositive/druidmini_test_insert.q b/ql/src/test/queries/clientpositive/druidmini_test_insert.q index 09e4a192b4..e97fe5be4b 100644 --- a/ql/src/test/queries/clientpositive/druidmini_test_insert.q +++ b/ql/src/test/queries/clientpositive/druidmini_test_insert.q @@ -34,6 +34,8 @@ SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, cboolean2 FROM alltypesorc where ctimestamp1 IS NOT NULL; +-- ANALYZE COLUMN STATS FOR DRUID TABLE +analyze table druid_alltypesorc compute statistics for columns; SELECT COUNT(*) FROM druid_alltypesorc; diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out index a4a5594e5d..e568b14ea1 100644 --- a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out +++ b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out @@ -90,6 +90,16 @@ SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, POSTHOOK: type: QUERY POSTHOOK: Input: default@alltypesorc POSTHOOK: Output: default@druid_alltypesorc +PREHOOK: query: analyze table druid_alltypesorc compute statistics for columns +PREHOOK: type: ANALYZE_TABLE +PREHOOK: Input: default@druid_alltypesorc +PREHOOK: Output: default@druid_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: analyze table druid_alltypesorc compute statistics for columns +POSTHOOK: type: ANALYZE_TABLE +POSTHOOK: Input: default@druid_alltypesorc +POSTHOOK: Output: default@druid_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### PREHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc PREHOOK: type: QUERY PREHOOK: Input: default@druid_alltypesorc diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java index 0dbecb75fd..56243159db 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java @@ -805,6 +805,10 @@ public static long getLong(Object o, PrimitiveObjectInspector oi) { result = ((TimestampObjectInspector) oi).getPrimitiveWritableObject(o) .getSeconds(); break; + case TIMESTAMPLOCALTZ: + result = ((TimestampLocalTZObjectInspector) oi).getPrimitiveWritableObject(o) + .getSeconds(); + break; case DECIMAL: { HiveDecimal dec = ((HiveDecimalObjectInspector) oi) -- 2.15.2 (Apple Git-101.1)