diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index ea84326..ab936fa 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -205,27 +205,32 @@ public ObjectInspector apply(PrimitiveTypeInfo type) { Query query; try { query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class); + + switch (query.getType()) { + case Query.TIMESERIES: + inferSchema((TimeseriesQuery) query, columnNames, columnTypes); + break; + case Query.TOPN: + inferSchema((TopNQuery) query, columnNames, columnTypes); + break; + case Query.SELECT: + String address = HiveConf.getVar(configuration, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { + throw new SerDeException("Druid broker address not specified in configuration"); + } + inferSchema((SelectQuery) query, columnNames, columnTypes, address); + break; + case Query.GROUP_BY: + inferSchema((GroupByQuery) query, columnNames, columnTypes); + break; + default: + throw new SerDeException("Not supported Druid query"); + } } catch (Exception e) { throw new SerDeException(e); } - switch (query.getType()) { - case Query.TIMESERIES: - inferSchema((TimeseriesQuery) query, columnNames, columnTypes); - break; - case Query.TOPN: - inferSchema((TopNQuery) query, columnNames, columnTypes); - break; - case Query.SELECT: - inferSchema((SelectQuery) query, columnNames, columnTypes); - break; - case Query.GROUP_BY: - inferSchema((GroupByQuery) query, columnNames, columnTypes); - break; - default: - throw new SerDeException("Not supported Druid query"); - } - columns = new String[columnNames.size()]; types = new PrimitiveTypeInfo[columnNames.size()]; for (int i = 0; i < columnTypes.size(); ++i) { @@ -327,8 +332,7 @@ private void inferSchema(TopNQuery query, List columnNames, /* Select query */ private void inferSchema(SelectQuery query, List columnNames, - List columnTypes - ) { + List columnTypes, String address) throws SerDeException { // Timestamp column columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN); columnTypes.add(TypeInfoFactory.timestampTypeInfo); @@ -337,10 +341,26 @@ private void inferSchema(SelectQuery query, List columnNames, columnNames.add(ds.getOutputName()); columnTypes.add(TypeInfoFactory.stringTypeInfo); } - // Metric columns + // We need to emit a metadata query to know the type of the metric columns + SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); + builder.dataSource(query.getDataSource()); + builder.merge(true); + builder.analysisTypes(); + SegmentMetadataQuery metadataQuery = builder.build(); + // Execute query in Druid + SegmentAnalysis schemaInfo; + try { + schemaInfo = submitMetadataRequest(address, metadataQuery); + } catch (IOException e) { + throw new SerDeException(e); + } + if (schemaInfo == null) { + throw new SerDeException("Connected to Druid but could not retrieve datasource information"); + } for (String metric : query.getMetrics()) { columnNames.add(metric); - columnTypes.add(TypeInfoFactory.floatTypeInfo); + columnTypes.add(DruidSerDeUtils.convertDruidToHiveType( + schemaInfo.getColumns().get(metric).getType())); } } diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java index 8dfa4d7..a67afdb 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java @@ -502,7 +502,7 @@ public void testDruidDeserializer() IllegalAccessException, IOException, InterruptedException, NoSuchMethodException, InvocationTargetException { // Create, initialize, and test the SerDe - DruidSerDe serDe = new DruidSerDe(); + QTestDruidSerDe serDe = new QTestDruidSerDe(); Configuration conf = new Configuration(); Properties tbl; // Timeseries query