diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 2e90df1..9548d96 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -17,26 +17,16 @@ */ package org.apache.hadoop.hive.druid.serde; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import io.druid.query.Druids; -import io.druid.query.Druids.SegmentMetadataQueryBuilder; -import io.druid.query.Query; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.groupby.GroupByQuery; -import io.druid.query.metadata.metadata.ColumnAnalysis; -import io.druid.query.metadata.metadata.SegmentAnalysis; -import io.druid.query.metadata.metadata.SegmentMetadataQuery; -import io.druid.query.select.SelectQuery; -import io.druid.query.timeseries.TimeseriesQuery; -import io.druid.query.topn.TopNQuery; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.Constants; @@ -53,15 +43,17 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -71,15 +63,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.query.Druids; +import io.druid.query.Druids.SegmentMetadataQueryBuilder; +import io.druid.query.Query; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.metadata.metadata.ColumnAnalysis; +import io.druid.query.metadata.metadata.SegmentAnalysis; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.select.SelectQuery; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.topn.TopNQuery; /** * DruidSerDe that is used to deserialize objects from a Druid data source. @@ -162,9 +166,9 @@ public ObjectInspector apply(PrimitiveTypeInfo type) { throw new SerDeException("Druid broker address not specified in configuration"); } - numConnection = HiveConf + numConnection = HiveConf .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - readTimeout = new Period( + readTimeout = new Period( HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); // Infer schema @@ -395,6 +399,15 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD .getPrimitiveJavaObject( values.get(i)).getTime(); break; + case BYTE: + res = ((ByteObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case SHORT: + res = ((ShortObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; + case INT: + res = ((IntObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); + break; case LONG: res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i)); break; @@ -448,9 +461,6 @@ public Object deserialize(Writable writable) throws SerDeException { case FLOAT: output.add(new FloatWritable(((Number) value).floatValue())); break; - case DOUBLE: - output.add(new DoubleWritable(((Number) value).floatValue())); - break; case STRING: output.add(new Text(value.toString())); break; diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java index a495165..e6e3707 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java @@ -24,8 +24,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -37,12 +39,23 @@ import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -52,6 +65,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.druid.data.input.Row; import io.druid.jackson.DefaultObjectMapper; @@ -478,7 +494,7 @@ * @throws NoSuchMethodException */ @Test - public void testDruidSerDe() + public void testDruidDeserializer() throws SerDeException, JsonParseException, JsonMappingException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, IOException, InterruptedException, @@ -622,4 +638,104 @@ private static void deserializeQueryResults(DruidSerDe serDe, String queryType, assertEquals(pos, records.length); } + + private static final String COLUMN_NAMES = "__time,c0,c1,c2,c3,c4,c5,c6"; + private static final String COLUMN_TYPES = "timestamp,string,double,float,bigint,int,smallint,tinyint"; + private static final Object[] ROW_OBJECT = new Object[] { + new TimestampWritable(new Timestamp(1377907200000L)), + new Text("dim1_val"), + new DoubleWritable(10669D), + new FloatWritable(10669F), + new LongWritable(1113939), + new IntWritable(1112123), + new ShortWritable((short) 12), + new ByteWritable((byte) 0), + new TimestampWritable(new Timestamp(1377907200000L)) // granularity + }; + private static final DruidWritable DRUID_WRITABLE = new DruidWritable( + ImmutableMap.builder() + .put("__time", 1377907200000L) + .put("c0", "dim1_val") + .put("c1", 10669D) + .put("c2", 10669F) + .put("c3", 1113939L) + .put("c4", 1112123) + .put("c5", (short) 12) + .put("c6", (byte) 0) + .put("__time_granularity", 1377907200000L) + .build()); + + /** + * Test the default behavior of the objects and object inspectors. + * @throws IOException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws JsonMappingException + * @throws JsonParseException + * @throws InvocationTargetException + * @throws NoSuchMethodException + */ + @Test + public void testDruidSerializer() + throws SerDeException, JsonParseException, JsonMappingException, + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException, IOException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + // Create, initialize, and test the SerDe + DruidSerDe serDe = new DruidSerDe(); + Configuration conf = new Configuration(); + Properties tbl; + // Mixed source (all types) + tbl = createPropertiesSource(COLUMN_NAMES, COLUMN_TYPES); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + serializeObject(tbl, serDe, ROW_OBJECT, DRUID_WRITABLE); + } + + private static Properties createPropertiesSource(String columnNames, String columnTypes) { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(serdeConstants.LIST_COLUMNS, columnNames); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnTypes); + return tbl; + } + + private static void serializeObject(Properties properties, DruidSerDe serDe, + Object[] rowObject, DruidWritable druidWritable) throws SerDeException { + // Build OI with timestamp granularity column + final List columnNames = new ArrayList<>(); + final List columnTypes = new ArrayList<>(); + List inspectors = new ArrayList<>(); + columnNames.addAll(Utilities.getColumnNames(properties)); + columnNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); + columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), + new Function() { + @Override + public PrimitiveTypeInfo apply(String type) { + return TypeInfoFactory.getPrimitiveTypeInfo(type); + } + } + )); + columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp")); + inspectors.addAll(Lists.transform(columnTypes, + new Function() { + @Override + public ObjectInspector apply(PrimitiveTypeInfo type) { + return PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(type); + } + } + )); + ObjectInspector inspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columnNames, inspectors); + // Serialize + DruidWritable writable = (DruidWritable) serDe.serialize(rowObject, inspector); + // Check result + assertEquals(DRUID_WRITABLE.getValue().size(), writable.getValue().size()); + for (Entry e: DRUID_WRITABLE.getValue().entrySet()) { + assertEquals(e.getValue(), writable.getValue().get(e.getKey())); + } + } }