diff --git druid-handler/pom.xml druid-handler/pom.xml
index 5b8a601131..5642ea818c 100644
--- druid-handler/pom.xml
+++ druid-handler/pom.xml
@@ -158,13 +158,13 @@
io.druid.extensions
mysql-metadata-storage
${druid.version}
-
+
mysql
mysql-connector-java
-
+
io.druid.extensions
postgresql-metadata-storage
@@ -346,7 +346,7 @@
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
- static/
+ static/
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
index 797d597500..0705804066 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
@@ -69,7 +69,7 @@ private DruidConstants() {
// value delimiter for druid columns
public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.delimiter";
- // list demiliter for multi-valued columns
+ // list delimiter for multi-valued columns
public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter";
// order of columns for delimiter and csv parse specs.
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
index c07c24d755..5db6be8f4b 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
@@ -35,11 +35,10 @@
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Properties;
/**
- * A Wrapper class that takes a row-by-row Druid Record Reader and provides a Vectorized one.
+ * A Wrapper class that consumes row-by-row from base Druid Record Reader and provides a Vectorized one.
* @param type of the Druid query.
*/
public class DruidVectorizedWrapper> implements RecordReader {
@@ -47,6 +46,7 @@
private final DruidQueryRecordReader baseReader;
private final VectorizedRowBatchCtx rbCtx;
private final DruidSerDe serDe;
+ private final Object[] rowBoat;
/**
* Actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src.
@@ -77,21 +77,21 @@ public DruidVectorizedWrapper(DruidQueryRecordReader reader, Configuration jobCo
}
druidWritable = baseReader.createValue();
+ rowBoat = new Object[projectedColumns.length];
}
@Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException {
vectorizedRowBatch.reset();
- ArrayList row;
int rowsCount = 0;
while (rowsCount < vectorizedRowBatch.getMaxSize() && baseReader.next(nullWritable, druidWritable)) {
if (projectedColumns.length > 0) {
try {
- row = serDe.deserializeAsPrimitive(druidWritable);
+ serDe.deserializeAsPrimitive(druidWritable, rowBoat);
} catch (SerDeException e) {
throw new IOException(e);
}
for (int i : projectedColumns) {
- vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i));
+ vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, rowBoat[i]);
}
}
rowsCount++;
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
index 00442f35af..7ea23913d2 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
@@ -33,7 +33,7 @@
* This class is copied from druid source code
* in order to avoid adding additional dependencies on druid-indexing-service.
*/
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({
+@SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({
@JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) })
public class KafkaSupervisorTuningConfig extends KafkaTuningConfig {
private final Integer workerThreads;
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
index 2faa1623b6..9a19da264f 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
@@ -109,7 +109,7 @@
/**
* Always returns true, doesn't affect the version being built.
*/
- @Deprecated @JsonProperty public boolean getBuildV9Directly() {
+ @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() {
return true;
}
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
index 9e09ddb45a..f35db6d01c 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
@@ -31,11 +31,11 @@
* This class is copied from druid source code
* in order to avoid adding additional dependencies on druid-indexing-service.
*/
-public class TaskReportData {
+@SuppressWarnings("unused") public class TaskReportData {
/**
* Task type used by serializer but does not have any functionality as far i can tell.
*/
- public enum TaskType {
+ @SuppressWarnings("unused") public enum TaskType {
ACTIVE, PUBLISHING, UNKNOWN
}
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
index db2721d6f8..4d9d1a9680 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java
@@ -50,6 +50,7 @@
}
if (queryResultsIterator.hasNext()) {
ScanResultValue current = queryResultsIterator.next();
+ //noinspection unchecked
compactedValues = ((List>) current.getEvents()).iterator();
return nextKeyValue();
}
diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index c751932dd2..696a2469c4 100644
--- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -42,14 +42,6 @@
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritableV2;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -73,11 +65,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -209,7 +196,7 @@ private void initFromProperties(final Properties properties) throws SerDeExcepti
final List
inspectors =
columnTypes.stream()
- .map(PrimitiveObjectInspectorFactory::getPrimitiveWritableObjectInspector)
+ .map(PrimitiveObjectInspectorFactory::getPrimitiveJavaObjectInspector)
.collect(Collectors.toList());
columns = columnNames.toArray(new String[0]);
types = columnTypes.toArray(new PrimitiveTypeInfo[0]);
@@ -254,7 +241,7 @@ private void initFromDruidQueryPlan(Properties properties, String druidQuery) {
for (int i = 0; i < columnTypes.size(); ++i) {
columns[i] = columnNames.get(i);
types[i] = columnTypes.get(i);
- inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+ inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(types[i]));
}
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -417,63 +404,8 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ
final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]);
if (value == null) {
output.add(null);
- continue;
- }
- switch (types[i].getPrimitiveCategory()) {
- case TIMESTAMP:
- output.add(new TimestampWritableV2(Timestamp.ofEpochMilli(deserializeToMillis(value))));
- break;
- case TIMESTAMPLOCALTZ:
- output.add(new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(
- deserializeToMillis(value)), ((TimestampLocalTZTypeInfo) types[i]).timeZone()))));
- break;
- case DATE:
- output.add(new DateWritableV2(Date.ofEpochMilli(deserializeToMillis(value))));
- break;
- case BYTE:
- output.add(new ByteWritable(((Number) value).byteValue()));
- break;
- case SHORT:
- output.add(new ShortWritable(((Number) value).shortValue()));
- break;
- case INT:
- if (value instanceof Number) {
- output.add(new IntWritable(((Number) value).intValue()));
- } else {
- // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn
- //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls
- output.add(new IntWritable(Integer.valueOf((String) value)));
- }
-
- break;
- case LONG:
- output.add(new LongWritable(((Number) value).longValue()));
- break;
- case FLOAT:
- output.add(new FloatWritable(((Number) value).floatValue()));
- break;
- case DOUBLE:
- output.add(new DoubleWritable(((Number) value).doubleValue()));
- break;
- case CHAR:
- output.add(new HiveCharWritable(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())));
- break;
- case VARCHAR:
- output.add(new HiveVarcharWritable(new HiveVarchar(value.toString(),
- ((VarcharTypeInfo) types[i]).getLength())));
- break;
- case STRING:
- output.add(new Text(value.toString()));
- break;
- case BOOLEAN:
- if (value instanceof Number) {
- output.add(new BooleanWritable(((Number) value).intValue() != 0));
- } else {
- output.add(new BooleanWritable(Boolean.valueOf(value.toString())));
- }
- break;
- default:
- throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ } else {
+ output.add(convertAsPrimitive(value, types[i]));
}
}
return output;
@@ -483,79 +415,65 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ
* Function to convert Druid Primitive values to Hive Primitives. Main usage of this is to pipe data to VectorRow.
* This has the exact same logic as {@link DruidSerDe#deserialize(Writable)}, any modification here should be done
* there as well.
- * Reason to have 2 function is performance, merging both will bring an extra test on the hot loop.
+ * Reason to have 2 function is that no vectorized path expects writables.
*
- * @param writable Druid Writable container.
- * @return ArrayList of Hive Primitives.
+ * @param writable Druid Writable.
+ * @param rowBoat Rowboat used to carry columns values.
+ * @throws SerDeException in case of deserialization errors.
*/
- public ArrayList deserializeAsPrimitive(Writable writable) throws SerDeException {
+ public void deserializeAsPrimitive(Writable writable, final Object[] rowBoat) throws SerDeException {
final DruidWritable input = (DruidWritable) writable;
- final ArrayList output = Lists.newArrayListWithExpectedSize(columns.length);
-
for (int i = 0; i < columns.length; i++) {
final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]);
- if (value == null) {
- output.add(null);
- continue;
+ rowBoat[i] = null;
+ if (value != null) {
+ rowBoat[i] = convertAsPrimitive(value, types[i]);
}
- switch (types[i].getPrimitiveCategory()) {
- case TIMESTAMP:
- output.add(Timestamp.ofEpochMilli(deserializeToMillis(value)));
- break;
- case TIMESTAMPLOCALTZ:
- output.add(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)),
- ((TimestampLocalTZTypeInfo) types[i]).timeZone())));
- break;
- case DATE:
- output.add(Date.ofEpochMilli(deserializeToMillis(value)));
- break;
- case BYTE:
- output.add(((Number) value).byteValue());
- break;
- case SHORT:
- output.add(((Number) value).shortValue());
- break;
- case INT:
- if (value instanceof Number) {
- output.add(((Number) value).intValue());
- } else {
- // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn
- //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls
- output.add(Integer.valueOf((String) value));
- }
+ }
+ }
- break;
- case LONG:
- output.add(((Number) value).longValue());
- break;
- case FLOAT:
- output.add(((Number) value).floatValue());
- break;
- case DOUBLE:
- output.add(((Number) value).doubleValue());
- break;
- case CHAR:
- output.add(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength()));
- break;
- case VARCHAR:
- output.add(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength()));
- break;
- case STRING:
- output.add(value.toString());
- break;
- case BOOLEAN:
- if (value instanceof Number) {
- output.add(((Number) value).intValue() != 0);
- } else {
- output.add(Boolean.valueOf(value.toString()));
- }
- break;
- default:
- throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ private static Object convertAsPrimitive(Object value, PrimitiveTypeInfo typeInfo) throws SerDeException {
+ switch (typeInfo.getPrimitiveCategory()) {
+ case TIMESTAMP:
+ return Timestamp.ofEpochMilli(deserializeToMillis(value));
+ case TIMESTAMPLOCALTZ:
+ return new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)),
+ ((TimestampLocalTZTypeInfo) typeInfo).timeZone()));
+ case DATE:
+ return (Date.ofEpochMilli(deserializeToMillis(value)));
+ case BYTE:
+ return ((Number) value).byteValue();
+ case SHORT:
+ return ((Number) value).shortValue();
+ case INT:
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ } else {
+ // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn
+ //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls
+ return (Integer.valueOf((String) value));
}
+ case LONG:
+ return ((Number) value).longValue();
+ case FLOAT:
+ return ((Number) value).floatValue();
+ case DOUBLE:
+ return ((Number) value).doubleValue();
+ case CHAR:
+ return new HiveChar(value.toString(), ((CharTypeInfo) typeInfo).getLength());
+ case VARCHAR:
+ return new HiveVarchar(value.toString(), ((VarcharTypeInfo) typeInfo).getLength());
+ case STRING:
+ return value.toString();
+ case BOOLEAN:
+ if (value instanceof Number) {
+ return (((Number) value).intValue() != 0);
+ } else {
+ return (Boolean.valueOf(value.toString()));
+ }
+ default:
+ throw new SerDeException("Unknown type: " + typeInfo.getPrimitiveCategory());
}
-
- return output;
}
private static long deserializeToMillis(Object value) {
diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
index 74bceec169..656dc25e6f 100644
--- druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
+++ druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
@@ -67,7 +67,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -154,15 +153,15 @@
// Timeseries query results as records
private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- new LongWritable(0),
- new FloatWritable(1.0F),
- new FloatWritable(2.2222F) },
+ new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC)),
+ 0L,
+ 1.0F,
+ 2.2222F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC))),
- new LongWritable(2),
- new FloatWritable(3.32F),
- new FloatWritable(4F) } };
+ new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC)),
+ 2L,
+ 3.32F,
+ 4F } };
// Timeseries query results as records (types defined by metastore)
private static final String TIMESERIES_COLUMN_NAMES = "timestamp,sample_name1,sample_name2,sample_divide";
@@ -269,35 +268,20 @@
// TopN query results as records
private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("dim1_val"),
- new LongWritable(111),
- new FloatWritable(10669F),
- new FloatWritable(96.11711711711712F) },
+ new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
+ "dim1_val", 111L, 10669F, 96.11711711711712F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("another_dim1_val"),
- new LongWritable(88),
- new FloatWritable(28344F),
- new FloatWritable(322.09090909090907F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
+ "another_dim1_val", 88L, 28344F, 322.09090909090907F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("dim1_val3"),
- new LongWritable(70),
- new FloatWritable(871F),
- new FloatWritable(12.442857142857143F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
+ "dim1_val3", 70L, 871F, 12.442857142857143F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("dim1_val4"),
- new LongWritable(62),
- new FloatWritable(815F),
- new FloatWritable(13.14516129032258F) },
+ new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
+ "dim1_val4", 62L, 815F, 13.14516129032258F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("dim1_val5"),
- new LongWritable(60),
- new FloatWritable(2787F),
- new FloatWritable(46.45F) } };
+ (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
+ "dim1_val5", 60L, 2787F, 46.45F } };
// TopN query results as records (types defined by metastore)
private static final String TOPN_COLUMN_NAMES = "timestamp,sample_dim,count,some_metric,sample_divide";
@@ -437,39 +421,25 @@
// GroupBy query results as records
private static final Object[][] GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- new LongWritable(200) },
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- new LongWritable(400) } };
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L } };
private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- new Text("India"),
- new Text("phone"),
- new LongWritable(88),
- new DoubleWritable(29.91233453),
- new FloatWritable(60.32F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), "India",
+ "phone", 88L, 29.91233453, 60.32F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- new Text("Spain"),
- new Text("pc"),
- new LongWritable(16),
- new DoubleWritable(172.93494959),
- new FloatWritable(6.333333F) } };
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
+ "Spain", "pc", 16L, 172.93494959, 6.333333F } };
private static final Object[][] GB_MONTH_EXTRACTION_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- new IntWritable(1),
- new LongWritable(200) },
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- new IntWritable(1),
- new LongWritable(400) } };
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L } };
// GroupBy query results as records (types defined by metastore)
private static final String GROUP_BY_COLUMN_NAMES = "timestamp,country,device,total_usage,data_transfer,avg_usage";
@@ -617,80 +587,50 @@
+ "float,float";
private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))),
- new BooleanWritable(true),
- new Text("article"),
- new Text("0"),
- new Text("0"),
- new Text("11._korpus_(NOVJ)"),
- new Text("sl"),
- new Text("0"),
- new Text("EmausBot"),
- new DoubleWritable(1.0d),
- new DoubleWritable(39.0d),
- new FloatWritable(39.0F),
- new FloatWritable(39.0F),
- new FloatWritable(0.0F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE,
+ "article",
+ "0",
+ "0",
+ "11._korpus_(NOVJ)",
+ "sl",
+ "0",
+ "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))),
- new BooleanWritable(false),
- new Text("article"),
- new Text("0"),
- new Text("0"),
- new Text("112_U.S._580"),
- new Text("en"),
- new Text("1"),
- new Text("MZMcBride"),
- new DoubleWritable(1.0d),
- new DoubleWritable(70.0d),
- new FloatWritable(70.0F),
- new FloatWritable(70.0F),
- new FloatWritable(0.0F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
+ "article",
+ "0",
+ "0",
+ "112_U.S._580",
+ "en",
+ "1",
+ "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))),
- new BooleanWritable(false),
- new Text("article"),
- new Text("0"),
- new Text("0"),
- new Text("113_U.S._243"),
- new Text("en"),
- new Text("1"),
- new Text("MZMcBride"),
- new DoubleWritable(1.0d),
- new DoubleWritable(77.0d),
- new FloatWritable(77.0F),
- new FloatWritable(77.0F),
- new FloatWritable(0.0F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
+ "article",
+ "0",
+ "0",
+ "113_U.S._243",
+ "en",
+ "1",
+ "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))),
- new BooleanWritable(false),
- new Text("article"),
- new Text("0"),
- new Text("0"),
- new Text("113_U.S._73"),
- new Text("en"),
- new Text("1"),
- new Text("MZMcBride"),
- new DoubleWritable(1.0d),
- new DoubleWritable(70.0d),
- new FloatWritable(70.0F),
- new FloatWritable(70.0F),
- new FloatWritable(0.0F) },
+ (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
+ "article",
+ "0",
+ "0",
+ "113_U.S._73",
+ "en",
+ "1",
+ "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))),
- new BooleanWritable(false),
- new Text("article"),
- new Text("0"),
- new Text("0"),
- new Text("113_U.S._756"),
- new Text("en"),
- new Text("1"),
- new Text("MZMcBride"),
- new DoubleWritable(1.0d),
- new DoubleWritable(68.0d),
- new FloatWritable(68.0F),
- new FloatWritable(68.0F),
- new FloatWritable(0.0F) } };
+ (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
+ "article",
+ "0",
+ "0",
+ "113_U.S._756",
+ "en",
+ "1",
+ "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F } };
// Scan query
private static final String
@@ -869,7 +809,6 @@ private static Properties createPropertiesQuery(String dataSource,
final HiveDruidSplit split = new HiveDruidSplit(jsonQuery, new Path("empty"), new String[]{"testing_host"});
- assert reader != null;
reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient);
StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
List extends StructField> fieldRefs = oi.getAllStructFieldRefs();
@@ -895,7 +834,6 @@ private static Properties createPropertiesQuery(String dataSource,
futureResult.set(new ByteArrayInputStream(resultString));
when(httpClient.go(anyObject(), any(HttpResponseHandler.class))).thenReturn(futureResult);
reader = DruidQueryBasedInputFormat.getDruidQueryReader(queryType);
- assert reader != null;
reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient);
pos = 0;
@@ -1028,16 +966,15 @@ private static void serializeObject(Properties properties,
}
private static final Object[] ROW_OBJECT_2 = new Object[]{
- new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- new Text("dim1_val"),
- new HiveCharWritable(new HiveChar("dim2_v", 6)),
- new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)),
- new DoubleWritable(10669.3D),
- new FloatWritable(10669.45F),
- new LongWritable(1113939),
- new IntWritable(1112123),
- new ShortWritable((short) 12),
- new ByteWritable((byte) 0) };
+ new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)), "dim1_val",
+ new HiveChar("dim2_v", 6),
+ new HiveVarchar("dim3_val", 8),
+ 10669.3D,
+ 10669.45F,
+ 1113939L,
+ 1112123,
+ ((short) 12),
+ ((byte) 0)};
private static final DruidWritable
DRUID_WRITABLE_2 =
new DruidWritable(ImmutableMap.builder().put("__time", 1377907200000L)
diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index ed6e430c8a..0df150dbd0 100644
--- druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -173,11 +173,11 @@
}, objectMapper);
Path
- segmentDescriptroPath =
+ segmentDescriptorPath =
new Path(workingDir.getAbsolutePath(), DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME);
DruidRecordWriter
druidRecordWriter =
- new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, segmentDescriptroPath, localFileSystem);
+ new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, segmentDescriptorPath, localFileSystem);
List
druidWritables =
@@ -194,7 +194,7 @@
druidRecordWriter.write(druidWritable);
}
druidRecordWriter.close(false);
- List dataSegmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptroPath, config);
+ List dataSegmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorPath, config);
Assert.assertEquals(1, dataSegmentList.size());
File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
diff --git ql/src/test/queries/clientpositive/druidmini_expressions.q ql/src/test/queries/clientpositive/druidmini_expressions.q
index 2f177108cf..36aad7937d 100644
--- ql/src/test/queries/clientpositive/druidmini_expressions.q
+++ ql/src/test/queries/clientpositive/druidmini_expressions.q
@@ -200,3 +200,13 @@ explain select `timets` from (select cast(`__time` as timestamp ) as timets from
select `timets_with_tz` from (select `__time` as timets_with_tz from druid_table_alltypesorc order by timets_with_tz limit 10) as src order by `timets_with_tz`;
select `timets` from (select cast(`__time` as timestamp ) as timets from druid_table_alltypesorc order by timets limit 10) as src order by `timets`;
+
+select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src;
+
+select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src;
+
+explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src;
+
+select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src;
+
+explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src;
\ No newline at end of file
diff --git ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
index 47f663bbf6..973cade307 100644
--- ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
+++ ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
@@ -2272,3 +2272,160 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
1969-12-31 15:59:00
1969-12-31 15:59:00
1969-12-31 15:59:00
+PREHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table_alltypesorc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table_alltypesorc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1025
+PREHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table_alltypesorc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table_alltypesorc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+3036
+PREHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table_alltypesorc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table_alltypesorc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: druid_table_alltypesorc
+ properties:
+ druid.fieldNames cstring1
+ druid.fieldTypes string
+ druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cstring1"],"resultFormat":"compactedList","limit":90000}
+ druid.query.type scan
+ Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(cstring1)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: vectorized, llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table_alltypesorc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table_alltypesorc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+8.256991051261554E15
+PREHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_table_alltypesorc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_table_alltypesorc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: druid_table_alltypesorc
+ properties:
+ druid.fieldNames cfloat,cint
+ druid.fieldTypes float,int
+ druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cfloat","cint"],"resultFormat":"compactedList","limit":90000}
+ druid.query.type scan
+ Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: (UDFToFloat(cint) * cfloat) (type: float)
+ outputColumnNames: _col0
+ Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: max(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: float)
+ Execution mode: vectorized, llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: max(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+