From 74e3f7e95d94dabeb9f06fb850e36ab3110f6c10 Mon Sep 17 00:00:00 2001 From: Nishant Date: Mon, 6 Aug 2018 22:34:40 +0530 Subject: [PATCH] [HIVE-20278] Avoid unnecessary copy between List -> Map -> List for Druid Scan results --- .../druid/serde/DruidGroupByQueryRecordReader.java | 2 +- .../hive/druid/serde/DruidQueryRecordReader.java | 2 +- .../druid/serde/DruidScanQueryRecordReader.java | 19 +++-- .../druid/serde/DruidSelectQueryRecordReader.java | 2 +- .../apache/hadoop/hive/druid/serde/DruidSerDe.java | 32 ++++---- .../serde/DruidTimeseriesQueryRecordReader.java | 2 +- .../druid/serde/DruidTopNQueryRecordReader.java | 2 +- .../hadoop/hive/druid/serde/DruidWritable.java | 86 ++++++++++++++++++---- .../hadoop/hive/druid/serde/TestDruidSerDe.java | 43 ++++++++++- 9 files changed, 143 insertions(+), 47 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index 82c6653dac..48850d01df 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -75,7 +75,7 @@ @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value - DruidWritable value = new DruidWritable(); + DruidWritable value = new DruidWritable(false); // 1) The timestamp column value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, currentRow.getTimestamp() == null ? null : currentRow.getTimestamp().getMillis() diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 0abd807eeb..215f4a2a03 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -134,7 +134,7 @@ public NullWritable createKey() { @Override public DruidWritable createValue() { - return new DruidWritable(); + return new DruidWritable(false); } @Override diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java index 64c640f45a..6c6514fea0 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java @@ -68,14 +68,17 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } + @Override + public DruidWritable createValue() + { + return new DruidWritable(true); + } + @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value - DruidWritable value = new DruidWritable(); - List e = compactedValues.next(); - for (int i = 0; i < current.getColumns().size(); i++) { - value.getValue().put(current.getColumns().get(i), e.get(i)); - } + DruidWritable value = new DruidWritable(true); + value.setCompactedValue(compactedValues.next()); return value; } @@ -83,11 +86,7 @@ public DruidWritable getCurrentValue() throws IOException, InterruptedException public boolean next(NullWritable key, DruidWritable value) throws IOException { if (nextKeyValue()) { // Update value - value.getValue().clear(); - List e = compactedValues.next(); - for (int i = 0; i < current.getColumns().size(); i++) { - value.getValue().put(current.getColumns().get(i), e.get(i)); - } + value.setCompactedValue(compactedValues.next()); return true; } return false; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java index 8e4d9048c7..1ccca5f117 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -73,7 +73,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value - DruidWritable value = new DruidWritable(); + DruidWritable value = new DruidWritable(false); EventHolder e = values.next(); value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); value.getValue().putAll(e.getEvent()); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 47924a63fb..946a0753ee 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -19,21 +19,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import java.io.IOException; -import java.io.InputStream; -import java.time.Instant; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.stream.Collectors; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.query.Druids; @@ -51,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -99,6 +83,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.stream.Collectors; + import static org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser; /** @@ -387,7 +385,7 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ final DruidWritable input = (DruidWritable) writable; final List output = Lists.newArrayListWithExpectedSize(columns.length); for (int i = 0; i < columns.length; i++) { - final Object value = input.getValue().get(columns[i]); + final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]); if (value == null) { output.add(null); continue; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java index d72624800d..fcda3efd34 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -59,7 +59,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value - DruidWritable value = new DruidWritable(); + DruidWritable value = new DruidWritable(false); value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, current.getTimestamp() == null ? null : current.getTimestamp().getMillis() ); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java index d082e919ca..e24bfff495 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java @@ -75,7 +75,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value - DruidWritable value = new DruidWritable(); + DruidWritable value = new DruidWritable(false); value.getValue().put("timestamp", current.getTimestamp().getMillis() ); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java index 4895a88f7a..7390647c4b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java @@ -17,35 +17,83 @@ */ package org.apache.hadoop.hive.druid.serde; +import com.google.common.collect.Lists; +import org.apache.hadoop.io.Writable; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; - -import org.apache.hadoop.io.Writable; - -import com.google.common.base.Objects; +import java.util.Objects; /** * Writable for Druid results. */ public class DruidWritable implements Writable { + /** + * value map stores column name to value mapping. + * This is only used when the result is not compacted. + */ private final Map value; - public DruidWritable() { - value = new HashMap<>(); + /** + * list of values in a compacted form, Serializer/Deserializer needs to agree on the order of values. + * This is only used when the result is compacted. + */ + private transient List compactedValue; + + private final boolean compacted; + + public DruidWritable(boolean compacted) + { + this.compacted = compacted; + if (compacted) { + compactedValue = Lists.newArrayList(); + value = null; + } else { + value = new HashMap<>(); + compactedValue = null; + } } public DruidWritable(Map value) { this.value = value; + this.compactedValue = null; + compacted = false; + } + + public DruidWritable(List value){ + this.compacted = true; + this.compactedValue = value; + this.value = null; } public Map getValue() { + if(compacted){ + throw new UnsupportedOperationException("compacted DruidWritable does not support getValue(), use getCompactedValue()"); + } return value; } + public List getCompactedValue() { + if(!compacted){ + throw new UnsupportedOperationException("non compacted DruidWritable does not support getCompactedValue(), use getValue()"); + } + return compactedValue; + } + + public void setCompactedValue(List compactedValue) + { + this.compactedValue = compactedValue; + } + + public boolean isCompacted(){ + return compacted; + } + @Override public void write(DataOutput out) throws IOException { throw new UnsupportedOperationException(); @@ -56,26 +104,36 @@ public void readFields(DataInput in) throws IOException { throw new UnsupportedOperationException(); } + @Override - public int hashCode() { - return Objects.hashCode(value); + public int hashCode() + { + return Objects.hash(value, compactedValue, compacted); } @Override - public boolean equals(Object o) { + public boolean equals(Object o) + { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - - return Objects.equal(value, ((DruidWritable) o).value); + DruidWritable that = (DruidWritable) o; + return compacted == that.compacted && + Objects.equals(value, that.value) && + Objects.equals(compactedValue, that.compactedValue); } + @Override - public String toString() { - return "DruidWritable{value=" + value + '}'; + public String toString() + { + return "DruidWritable{" + + "value=" + value + + ", compactedValue=" + compactedValue + + ", compacted=" + compacted + + '}'; } - } diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java index 060c654192..0fdd40b44d 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -34,6 +34,8 @@ import java.util.Map.Entry; import java.util.Properties; +import io.druid.query.scan.ScanResultValue; +import io.druid.query.select.EventHolder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveChar; @@ -146,6 +148,7 @@ private byte[] groupByTimeExtractQueryResults; private byte[] selectQueryResults; private byte[] groupByMonthExtractQueryResults; + private byte[] scanQueryResults; // Timeseries query results as records @@ -584,6 +587,30 @@ new FloatWritable(68.0F), new FloatWritable(0.0F) } }; + // Scan query + private static final String SCAN_QUERY = + "{ \"queryType\": \"scan\", " + + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " + + " \"columns\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\",\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " + + " \"granularity\": \"all\", " + + " \"intervals\": [ \"2013-01-01/2013-01-02\" ]," + + " \"resultFormat\": \"compactedList\"," + + " \"limit\": 5" + + "}"; + + private static final String SCAN_QUERY_RESULTS = "[{" + + "\"segmentId\":\"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"," + + "\"columns\":[\"__time\",\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\"," + + "\"newpage\",\"user\",\"count\",\"added\",\"delta\",\"variation\",\"deleted\"]," + + "\"events\":[" + + "[\"2013-01-01T00:00:00.000Z\", 1,\"article\",\"0\",\"0\",\"11._korpus_(NOVJ)\",\"sl\",\"0\",\"EmausBot\",1.0,39.0,39.0,39.0,0.0]," + + "[\"2013-01-01T00:00:00.000Z\", 0,\"article\",\"0\",\"0\",\"112_U.S._580\",\"en\",\"1\",\"MZMcBride\",1.0,70.0,70.0,70.0,0.0]," + + "[\"2013-01-01T00:00:12.000Z\", 0,\"article\",\"0\",\"0\",\"113_U.S._243\",\"en\",\"1\",\"MZMcBride\",1.0,77.0,77.0,77.0,0.0]," + + "[\"2013-01-01T00:00:12.000Z\", 0,\"article\",\"0\",\"0\",\"113_U.S._73\",\"en\",\"1\",\"MZMcBride\",1.0,70.0,70.0,70.0,0.0]," + + "[\"2013-01-01T00:00:12.000Z\", 0,\"article\",\"0\",\"0\",\"113_U.S._756\",\"en\",\"1\",\"MZMcBride\",1.0,68.0,68.0,68.0,0.0]" + + "]}]"; + + @Before public void setup() throws IOException { tsQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(TIMESERIES_QUERY_RESULTS, new TypeReference>>() { @@ -605,6 +632,10 @@ public void setup() throws IOException { selectQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(SELECT_QUERY_RESULTS, new TypeReference>>() { })); + + scanQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER + .writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(SCAN_QUERY_RESULTS, new TypeReference>() { + })); } /** @@ -677,6 +708,15 @@ public void testDruidDeserializer() throws SerDeException, NoSuchFieldException, deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, selectQueryResults, SELECT_QUERY_RESULTS_RECORDS ); + + // Scan query -- results should be same as select query + tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SELECT_COLUMN_NAMES, + SELECT_COLUMN_TYPES + ); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, + SELECT_QUERY_RESULTS_RECORDS + ); } private static Properties createPropertiesQuery(String dataSource, String queryType, @@ -719,7 +759,7 @@ private void deserializeQueryResults(DruidSerDe serDe, String queryType, String List fieldRefs = oi.getAllStructFieldRefs(); // Check mapred - DruidWritable writable = new DruidWritable(); + DruidWritable writable = reader.createValue(); int pos = 0; while (reader.next(NullWritable.get(), writable)) { List row = (List) serDe.deserialize(writable); @@ -920,4 +960,5 @@ private static void deserializeObject(Properties properties, DruidSerDe serDe, assertEquals(rowObject[i], object.get(i)); } } + } -- 2.15.2 (Apple Git-101.1)