diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4814fcd..551aafc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -221,6 +221,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -7170,7 +7171,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); Class serdeClass = LazySimpleSerDe.class; if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) { - serdeClass = LazyBinarySerDe.class; + serdeClass = LazyBinarySerDe2.class; } table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, diff --git a/ql/src/test/queries/clientpositive/input_lazyserde2.q b/ql/src/test/queries/clientpositive/input_lazyserde2.q new file mode 100644 index 0000000..0f58907 --- /dev/null +++ b/ql/src/test/queries/clientpositive/input_lazyserde2.q @@ -0,0 +1,53 @@ +-- SORT_QUERY_RESULTS + +DROP TABLE dest1; +CREATE TABLE dest1(a array, b array, c map, d int, e string) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '1' +COLLECTION ITEMS TERMINATED BY '2' +MAP KEYS TERMINATED BY '3' +LINES TERMINATED BY '10' +STORED AS TEXTFILE; + +EXPLAIN +FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1; + +FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1; + +SELECT dest1.* FROM dest1 CLUSTER BY 1; + +SELECT dest1.a[0], dest1.b[0], dest1.c['key2'], dest1.d, dest1.e FROM dest1 CLUSTER BY 1; + +DROP TABLE dest1; + +CREATE TABLE dest1(a array) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\'; +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint FROM src_thrift DISTRIBUTE BY 1; +SELECT * from dest1; +DROP TABLE dest1; + +CREATE TABLE dest1(a map) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\'; +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.mstringstring FROM src_thrift DISTRIBUTE BY 1; +SELECT * from dest1; + +CREATE TABLE destBin(a UNIONTYPE, struct>) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE; +INSERT OVERWRITE TABLE destBin SELECT create_union( CASE WHEN key < 100 THEN 0 WHEN key < 200 THEN 1 WHEN key < 300 THEN 2 WHEN key < 400 THEN 3 ELSE 0 END, key, 2.0D, array("one","two"), struct(5,"five")) FROM srcbucket2; +SELECT * from destBin; +DROP TABLE destBin; + +DROP TABLE dest2; +DROP TABLE dest3; + +CREATE TABLE dest2 (a map, map>>>>) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE; +INSERT OVERWRITE TABLE dest2 SELECT src_thrift.attributes FROM src_thrift; +SELECT a from dest2 limit 10; + +CREATE TABLE dest3 ( +unionfield1 uniontype, map>, +unionfield2 uniontype, map>, +unionfield3 uniontype, map> +) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE; +INSERT OVERWRITE TABLE dest3 SELECT src_thrift.unionField1,src_thrift.unionField2,src_thrift.unionField3 from src_thrift; +SELECT unionfield1, unionField2, unionfield3 from dest3 limit 10; diff --git a/ql/src/test/results/clientpositive/input_lazyserde2.q.out b/ql/src/test/results/clientpositive/input_lazyserde2.q.out new file mode 100644 index 0000000..82c72db --- /dev/null +++ b/ql/src/test/results/clientpositive/input_lazyserde2.q.out @@ -0,0 +1,844 @@ +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dest1(a array, b array, c map, d int, e string) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '1' +COLLECTION ITEMS TERMINATED BY '2' +MAP KEYS TERMINATED BY '3' +LINES TERMINATED BY '10' +STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(a array, b array, c map, d int, e string) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '1' +COLLECTION ITEMS TERMINATED BY '2' +MAP KEYS TERMINATED BY '3' +LINES TERMINATED BY '10' +STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src_thrift + Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: lint (type: array), lstring (type: array), mstringstring (type: map), aint (type: int), astring (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 1 (type: int) + Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: array), _col1 (type: array), _col2 (type: map), _col3 (type: int), _col4 (type: string) + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: array), VALUE._col1 (type: array), VALUE._col2 (type: map), VALUE._col3 (type: int), VALUE._col4 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-2 + Stats-Aggr Operator + +PREHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring DISTRIBUTE BY 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: dest1.b SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lstring, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: dest1.c SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +POSTHOOK: Lineage: dest1.d SIMPLE [(src_thrift)src_thrift.FieldSchema(name:aint, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: dest1.e SIMPLE [(src_thrift)src_thrift.FieldSchema(name:astring, type:string, comment:from deserializer), ] +PREHOOK: query: SELECT dest1.* FROM dest1 CLUSTER BY 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 CLUSTER BY 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +NULL NULL NULL 0 NULL +[0,0,0] ["0","0","0"] {"key_0":"value_0"} 1712634731 record_0 +[1,2,3] ["10","100","1000"] {"key_1":"value_1"} 465985200 record_1 +[2,4,6] ["20","200","2000"] {"key_2":"value_2"} -751827638 record_2 +[3,6,9] ["30","300","3000"] {"key_3":"value_3"} 477111222 record_3 +[4,8,12] ["40","400","4000"] {"key_4":"value_4"} -734328909 record_4 +[5,10,15] ["50","500","5000"] {"key_5":"value_5"} -1952710710 record_5 +[6,12,18] ["60","600","6000"] {"key_6":"value_6"} 1244525190 record_6 +[7,14,21] ["70","700","7000"] {"key_7":"value_7"} -1461153973 record_7 +[8,16,24] ["80","800","8000"] {"key_8":"value_8"} 1638581578 record_8 +[9,18,27] ["90","900","9000"] {"key_9":"value_9"} 336964413 record_9 +PREHOOK: query: SELECT dest1.a[0], dest1.b[0], dest1.c['key2'], dest1.d, dest1.e FROM dest1 CLUSTER BY 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.a[0], dest1.b[0], dest1.c['key2'], dest1.d, dest1.e FROM dest1 CLUSTER BY 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0 NULL 1712634731 record_0 +1 10 NULL 465985200 record_1 +2 20 NULL -751827638 record_2 +3 30 NULL 477111222 record_3 +4 40 NULL -734328909 record_4 +5 50 NULL -1952710710 record_5 +6 60 NULL 1244525190 record_6 +7 70 NULL -1461153973 record_7 +8 80 NULL 1638581578 record_8 +9 90 NULL 336964413 record_9 +NULL NULL NULL 0 NULL +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dest1 +PREHOOK: Output: default@dest1 +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dest1 +POSTHOOK: Output: default@dest1 +PREHOOK: query: CREATE TABLE dest1(a array) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(a array) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint FROM src_thrift DISTRIBUTE BY 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@dest1 +POSTHOOK: query: INSERT OVERWRITE TABLE dest1 SELECT src_thrift.lint FROM src_thrift DISTRIBUTE BY 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +PREHOOK: query: SELECT * from dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * from dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +NULL +[0,0,0] +[1,2,3] +[2,4,6] +[3,6,9] +[4,8,12] +[5,10,15] +[6,12,18] +[7,14,21] +[8,16,24] +[9,18,27] +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dest1 +PREHOOK: Output: default@dest1 +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dest1 +POSTHOOK: Output: default@dest1 +PREHOOK: query: CREATE TABLE dest1(a map) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(a map) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' ESCAPED BY '\\' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: INSERT OVERWRITE TABLE dest1 SELECT src_thrift.mstringstring FROM src_thrift DISTRIBUTE BY 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@dest1 +POSTHOOK: query: INSERT OVERWRITE TABLE dest1 SELECT src_thrift.mstringstring FROM src_thrift DISTRIBUTE BY 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +PREHOOK: query: SELECT * from dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * from dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +NULL +{"key_0":"value_0"} +{"key_1":"value_1"} +{"key_2":"value_2"} +{"key_3":"value_3"} +{"key_4":"value_4"} +{"key_5":"value_5"} +{"key_6":"value_6"} +{"key_7":"value_7"} +{"key_8":"value_8"} +{"key_9":"value_9"} +PREHOOK: query: CREATE TABLE destBin(a UNIONTYPE, struct>) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@destBin +POSTHOOK: query: CREATE TABLE destBin(a UNIONTYPE, struct>) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@destBin +PREHOOK: query: INSERT OVERWRITE TABLE destBin SELECT create_union( CASE WHEN key < 100 THEN 0 WHEN key < 200 THEN 1 WHEN key < 300 THEN 2 WHEN key < 400 THEN 3 ELSE 0 END, key, 2.0D, array("one","two"), struct(5,"five")) FROM srcbucket2 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@destbin +POSTHOOK: query: INSERT OVERWRITE TABLE destBin SELECT create_union( CASE WHEN key < 100 THEN 0 WHEN key < 200 THEN 1 WHEN key < 300 THEN 2 WHEN key < 400 THEN 3 ELSE 0 END, key, 2.0D, array("one","two"), struct(5,"five")) FROM srcbucket2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@destbin +POSTHOOK: Lineage: destbin.a EXPRESSION [(srcbucket2)srcbucket2.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: SELECT * from destBin +PREHOOK: type: QUERY +PREHOOK: Input: default@destbin +#### A masked pattern was here #### +POSTHOOK: query: SELECT * from destBin +POSTHOOK: type: QUERY +POSTHOOK: Input: default@destbin +#### A masked pattern was here #### +{0:0} +{0:0} +{0:0} +{0:10} +{0:11} +{0:12} +{0:12} +{0:15} +{0:15} +{0:17} +{0:18} +{0:18} +{0:19} +{0:20} +{0:24} +{0:24} +{0:26} +{0:26} +{0:27} +{0:28} +{0:2} +{0:30} +{0:33} +{0:34} +{0:35} +{0:35} +{0:35} +{0:37} +{0:37} +{0:400} +{0:401} +{0:401} +{0:401} +{0:401} +{0:401} +{0:402} +{0:403} +{0:403} +{0:403} +{0:404} +{0:404} +{0:406} +{0:406} +{0:406} +{0:406} +{0:407} +{0:409} +{0:409} +{0:409} +{0:411} +{0:413} +{0:413} +{0:414} +{0:414} +{0:417} +{0:417} +{0:417} +{0:418} +{0:419} +{0:41} +{0:421} +{0:424} +{0:424} +{0:427} +{0:429} +{0:429} +{0:42} +{0:42} +{0:430} +{0:430} +{0:430} +{0:431} +{0:431} +{0:431} +{0:432} +{0:435} +{0:436} +{0:437} +{0:438} +{0:438} +{0:438} +{0:439} +{0:439} +{0:43} +{0:443} +{0:444} +{0:446} +{0:448} +{0:449} +{0:44} +{0:452} +{0:453} +{0:454} +{0:454} +{0:454} +{0:455} +{0:457} +{0:458} +{0:458} +{0:459} +{0:459} +{0:460} +{0:462} +{0:462} +{0:463} +{0:463} +{0:466} +{0:466} +{0:466} +{0:467} +{0:468} +{0:468} +{0:468} +{0:468} +{0:469} +{0:469} +{0:469} +{0:469} +{0:469} +{0:470} +{0:472} +{0:475} +{0:477} +{0:478} +{0:478} +{0:479} +{0:47} +{0:480} +{0:480} +{0:480} +{0:481} +{0:482} +{0:483} +{0:484} +{0:485} +{0:487} +{0:489} +{0:489} +{0:489} +{0:489} +{0:490} +{0:491} +{0:492} +{0:492} +{0:493} +{0:494} +{0:495} +{0:496} +{0:497} +{0:498} +{0:498} +{0:498} +{0:4} +{0:51} +{0:51} +{0:53} +{0:54} +{0:57} +{0:58} +{0:58} +{0:5} +{0:5} +{0:5} +{0:64} +{0:65} +{0:66} +{0:67} +{0:67} +{0:69} +{0:70} +{0:70} +{0:70} +{0:72} +{0:72} +{0:74} +{0:76} +{0:76} +{0:77} +{0:78} +{0:80} +{0:82} +{0:83} +{0:83} +{0:84} +{0:84} +{0:85} +{0:86} +{0:87} +{0:8} +{0:90} +{0:90} +{0:90} +{0:92} +{0:95} +{0:95} +{0:96} +{0:97} +{0:97} +{0:98} +{0:98} +{0:9} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{1:2.0} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{2:["one","two"]} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +{3:{"col1":5,"col2":"five"}} +PREHOOK: query: DROP TABLE destBin +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@destbin +PREHOOK: Output: default@destbin +POSTHOOK: query: DROP TABLE destBin +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@destbin +POSTHOOK: Output: default@destbin +PREHOOK: query: DROP TABLE dest2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE dest2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE dest3 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE dest3 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dest2 (a map, map>>>>) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest2 +POSTHOOK: query: CREATE TABLE dest2 (a map, map>>>>) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest2 +PREHOOK: query: INSERT OVERWRITE TABLE dest2 SELECT src_thrift.attributes FROM src_thrift +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@dest2 +POSTHOOK: query: INSERT OVERWRITE TABLE dest2 SELECT src_thrift.attributes FROM src_thrift +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@dest2 +POSTHOOK: Lineage: dest2.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:attributes, type:map,map>>>>, comment:from deserializer), ] +PREHOOK: query: SELECT a from dest2 limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT a from dest2 limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest2 +#### A masked pattern was here #### +{"key_0":{"erVal0":{"value_0":{3:1.0}}}} +{"key_1":{"erVal1":{"value_1":{3:1.0}}}} +{"key_2":{"erVal2":{"value_2":{3:1.0}}}} +{"key_3":{"erVal3":{"value_3":{3:1.0}}}} +{"key_4":{"erVal4":{"value_4":{3:1.0}}}} +{"key_5":{"erVal5":{"value_5":{3:1.0}}}} +{"key_6":{"erVal6":{"value_6":{3:1.0}}}} +{"key_7":{"erVal7":{"value_7":{3:1.0}}}} +{"key_8":{"erVal8":{"value_8":{3:1.0}}}} +{"key_9":{"erVal9":{"value_9":{3:1.0}}}} +PREHOOK: query: CREATE TABLE dest3 ( +unionfield1 uniontype, map>, +unionfield2 uniontype, map>, +unionfield3 uniontype, map> +) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest3 +POSTHOOK: query: CREATE TABLE dest3 ( +unionfield1 uniontype, map>, +unionfield2 uniontype, map>, +unionfield3 uniontype, map> +) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2' STORED AS SEQUENCEFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest3 +PREHOOK: query: INSERT OVERWRITE TABLE dest3 SELECT src_thrift.unionField1,src_thrift.unionField2,src_thrift.unionField3 from src_thrift +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@dest3 +POSTHOOK: query: INSERT OVERWRITE TABLE dest3 SELECT src_thrift.unionField1,src_thrift.unionField2,src_thrift.unionField3 from src_thrift +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@dest3 +POSTHOOK: Lineage: dest3.unionfield1 SIMPLE [(src_thrift)src_thrift.FieldSchema(name:unionfield1, type:uniontype,map>, comment:from deserializer), ] +POSTHOOK: Lineage: dest3.unionfield2 SIMPLE [(src_thrift)src_thrift.FieldSchema(name:unionfield2, type:uniontype,map>, comment:from deserializer), ] +POSTHOOK: Lineage: dest3.unionfield3 SIMPLE [(src_thrift)src_thrift.FieldSchema(name:unionfield3, type:uniontype,map>, comment:from deserializer), ] +PREHOOK: query: SELECT unionfield1, unionField2, unionfield3 from dest3 limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest3 +#### A masked pattern was here #### +POSTHOOK: query: SELECT unionfield1, unionField2, unionfield3 from dest3 limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest3 +#### A masked pattern was here #### +{2:"test0"} {6:{"key_0":"value_0"}} {5:["0","0","0"]} +{2:"test1"} {6:{"key_1":"value_1"}} {5:["10","100","1000"]} +{2:"test2"} {6:{"key_2":"value_2"}} {5:["20","200","2000"]} +{2:"test3"} {6:{"key_3":"value_3"}} {5:["30","300","3000"]} +{2:"test4"} {6:{"key_4":"value_4"}} {5:["40","400","4000"]} +{2:"test5"} {6:{"key_5":"value_5"}} {5:["50","500","5000"]} +{2:"test6"} {6:{"key_6":"value_6"}} {5:["60","600","6000"]} +{2:"test7"} {6:{"key_7":"value_7"}} {5:["70","700","7000"]} +{2:"test8"} {6:{"key_8":"value_8"}} {5:["80","800","8000"]} +{2:"test9"} {6:{"key_9":"value_9"}} {5:["90","900","9000"]} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java index 7ab2083..a07d704 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java @@ -98,10 +98,10 @@ public LazyBinarySerDe() throws SerDeException { // The object for storing row data LazyBinaryStruct cachedLazyBinaryStruct; - private int serializedSize; - private SerDeStats stats; - private boolean lastOperationSerialize; - private boolean lastOperationDeserialize; + int serializedSize; + SerDeStats stats; + boolean lastOperationSerialize; + boolean lastOperationDeserialize; /** * Initialize the SerDe with configuration and table information. @@ -297,7 +297,7 @@ private static void serializeUnion(RandomAccessOutput byteStream, Object obj, serialize(byteStream, uoi.getField(obj), uoi.getObjectInspectors().get(tag), false, warnedOnceNullMapKey); } - private static void serializeText( + protected static void serializeText( RandomAccessOutput byteStream, Text t, boolean skipLengthPrefix) { /* write byte size of the string which is a vint */ int length = t.getLength(); @@ -317,7 +317,7 @@ public BooleanRef(boolean v) { public boolean value; } - private static void writeDateToByteStream(RandomAccessOutput byteStream, + public static void writeDateToByteStream(RandomAccessOutput byteStream, DateWritable date) { LazyBinaryUtils.writeVInt(byteStream, date.getDays()); } @@ -692,7 +692,7 @@ public static void serialize(RandomAccessOutput byteStream, Object obj, } } - private static void writeSizeAtOffset( + protected static void writeSizeAtOffset( RandomAccessOutput byteStream, int byteSizeStart, int size) { byteStream.writeInt(byteSizeStart, size); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe2.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe2.java new file mode 100644 index 0000000..b0452d8 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe2.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.lazybinary; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalDayTimeObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalYearMonthObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Subclass of LazyBinarySerDe with faster serialization, initializing a serializer based on the + * row columns rather than checking the ObjectInspector category/primitiveType for every value. + * This appears to be around 3x faster than the LazyBinarSerDe serialization. + */ +@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES}) +public class LazyBinarySerDe2 extends LazyBinarySerDe { + LBSerializer rowSerializer; + + public LazyBinarySerDe2() throws SerDeException { + super(); + } + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + super.initialize(conf, tbl); + ObjectInspector oi = getObjectInspector(); + + rowSerializer = createLBSerializer(oi); + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // serialize the row as BytesWritable + serializeByteStream.reset(); + rowSerializer.serializeValue(serializeByteStream, obj, objInspector, true, nullMapKey); + serializeBytesWritable.set(serializeByteStream.getData(), 0, serializeByteStream.getLength()); + + // Stats bookkeeping + serializedSize = serializeByteStream.getLength(); + lastOperationSerialize = true; + lastOperationDeserialize = false; + + return serializeBytesWritable; + } + + /** + * Generate a LBSerializer for the given primitive ObjectInspector + * @param poi + * @return + */ + LBSerializer createPrimitiveLBSerializer(PrimitiveObjectInspector poi) { + switch (poi.getPrimitiveCategory()) { + case VOID: + return new LBVoidSerializer(); + case BOOLEAN: + return new LBBooleanSerializer(); + case BYTE: + return new LBByteSerializer(); + case SHORT: + return new LBShortSerializer(); + case INT: + return new LBIntSerializer(); + case LONG: + return new LBLongSerializer(); + case FLOAT: + return new LBFloatSerializer(); + case DOUBLE: + return new LBDoubleSerializer(); + case STRING: + return new LBStringSerializer(); + case CHAR: + return new LBHiveCharSerializer(); + case VARCHAR: + return new LBHiveVarcharSerializer(); + case BINARY: + return new LBBinarySerializer(); + case DATE: + return new LBDateSerializer(); + case TIMESTAMP: + return new LBTimestampSerializer(); + case INTERVAL_YEAR_MONTH: + return new LBHiveIntervalYearMonthSerializer(); + case INTERVAL_DAY_TIME: + return new LBHiveIntervalDayTimeSerializer(); + case DECIMAL: + return new LBHiveDecimalSerializer(); + default: + throw new IllegalArgumentException("Unsupported primitive category " + poi.getPrimitiveCategory()); + } + } + + /** + * Generate a LBSerializer for the given ObjectInspector + * @param oi + * @return + */ + LBSerializer createLBSerializer(ObjectInspector oi) { + switch (oi.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + return createPrimitiveLBSerializer(poi); + case LIST: + ListObjectInspector loi = (ListObjectInspector) oi; + ObjectInspector eoi = loi.getListElementObjectInspector(); + return new LBListSerializer(createLBSerializer(eoi)); + case MAP: + MapObjectInspector moi = (MapObjectInspector) oi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + return new LBMapSerializer(createLBSerializer(koi), createLBSerializer(voi)); + case STRUCT: + StructObjectInspector soi = (StructObjectInspector) oi; + List fields = soi.getAllStructFieldRefs(); + LBSerializer[] fieldSerializers = new LBSerializer[fields.size()]; + for (int idx = 0; idx < fieldSerializers.length; ++idx) { + fieldSerializers[idx] = createLBSerializer(fields.get(idx).getFieldObjectInspector()); + } + return new LBStructSerializer(fieldSerializers); + case UNION: + UnionObjectInspector uoi = (UnionObjectInspector) oi; + List unionFields = uoi.getObjectInspectors(); + LBSerializer[] unionFieldSerializers = new LBSerializer[unionFields.size()]; + for (int idx = 0; idx < unionFieldSerializers.length; ++idx) { + unionFieldSerializers[idx] = createLBSerializer(unionFields.get(idx)); + } + return new LBUnionSerializer(unionFieldSerializers); + default: + throw new IllegalArgumentException("Unsupported category " + oi.getCategory()); + } + } + + /** + * Abstract serializer class for serializing to LazyBinary format. + */ + abstract static class LBSerializer { + public void serializeValue(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + if (obj != null) { + serialize(byteStream, obj, objInspector, skipLengthPrefix, warnedOnceNullMapKey); + } + } + + abstract void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey); + } + + static class LBVoidSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + return; + } + } + + static class LBBooleanSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + boolean v = ((BooleanObjectInspector) objInspector).get(obj); + byteStream.write((byte) (v ? 1 : 0)); + } + } + + static class LBByteSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + ByteObjectInspector boi = (ByteObjectInspector) objInspector; + byte v = boi.get(obj); + byteStream.write(v); + } + } + + static class LBShortSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + ShortObjectInspector spoi = (ShortObjectInspector) objInspector; + short v = spoi.get(obj); + byteStream.write((byte) (v >> 8)); + byteStream.write((byte) (v)); + } + } + + static class LBIntSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + IntObjectInspector ioi = (IntObjectInspector) objInspector; + int v = ioi.get(obj); + LazyBinaryUtils.writeVInt(byteStream, v); + } + } + + static class LBLongSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + LongObjectInspector loi = (LongObjectInspector) objInspector; + long v = loi.get(obj); + LazyBinaryUtils.writeVLong(byteStream, v); + return; + } + } + + static class LBFloatSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + FloatObjectInspector foi = (FloatObjectInspector) objInspector; + int v = Float.floatToIntBits(foi.get(obj)); + byteStream.write((byte) (v >> 24)); + byteStream.write((byte) (v >> 16)); + byteStream.write((byte) (v >> 8)); + byteStream.write((byte) (v)); + } + } + + static class LBDoubleSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + DoubleObjectInspector doi = (DoubleObjectInspector) objInspector; + LazyBinaryUtils.writeDouble(byteStream, doi.get(obj)); + } + } + + static class LBStringSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + StringObjectInspector soi = (StringObjectInspector) objInspector; + Text t = soi.getPrimitiveWritableObject(obj); + serializeText(byteStream, t, skipLengthPrefix); + } + } + + static class LBHiveCharSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + HiveCharObjectInspector hcoi = (HiveCharObjectInspector) objInspector; + Text t = hcoi.getPrimitiveWritableObject(obj).getTextValue(); + serializeText(byteStream, t, skipLengthPrefix); + } + } + + static class LBHiveVarcharSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + HiveVarcharObjectInspector hcoi = (HiveVarcharObjectInspector) objInspector; + Text t = hcoi.getPrimitiveWritableObject(obj).getTextValue(); + serializeText(byteStream, t, skipLengthPrefix); + } + } + + static class LBBinarySerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + BinaryObjectInspector baoi = (BinaryObjectInspector) objInspector; + BytesWritable bw = baoi.getPrimitiveWritableObject(obj); + int length = bw.getLength(); + if(!skipLengthPrefix){ + LazyBinaryUtils.writeVInt(byteStream, length); + } else { + if (length == 0){ + throw new RuntimeException("LazyBinaryColumnarSerde cannot serialize a non-null zero " + + "length binary field. Consider using either LazyBinarySerde or ColumnarSerde."); + } + } + byteStream.write(bw.getBytes(),0,length); + } + } + + static class LBDateSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + DateWritable d = ((DateObjectInspector) objInspector).getPrimitiveWritableObject(obj); + LazyBinarySerDe.writeDateToByteStream(byteStream, d); + } + } + + static class LBTimestampSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + TimestampObjectInspector toi = (TimestampObjectInspector) objInspector; + TimestampWritable t = toi.getPrimitiveWritableObject(obj); + t.writeToByteStream(byteStream); + } + } + + static class LBHiveIntervalYearMonthSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + HiveIntervalYearMonthWritable intervalYearMonth = + ((HiveIntervalYearMonthObjectInspector) objInspector).getPrimitiveWritableObject(obj); + intervalYearMonth.writeToByteStream(byteStream); + } + } + + static class LBHiveIntervalDayTimeSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + HiveIntervalDayTimeWritable intervalDayTime = + ((HiveIntervalDayTimeObjectInspector) objInspector).getPrimitiveWritableObject(obj); + intervalDayTime.writeToByteStream(byteStream); + } + } + + static class LBHiveDecimalSerializer extends LBSerializer { + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + HiveDecimalObjectInspector bdoi = (HiveDecimalObjectInspector) objInspector; + HiveDecimalWritable t = bdoi.getPrimitiveWritableObject(obj); + if (t == null) { + return; + } + writeToByteStream(byteStream, t); + } + } + + static class LBListSerializer extends LBSerializer { + LBSerializer elementSerializer; + + public LBListSerializer(LBSerializer elementSerializer) { + super(); + this.elementSerializer = elementSerializer; + } + + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + ListObjectInspector loi = (ListObjectInspector) objInspector; + ObjectInspector eoi = loi.getListElementObjectInspector(); + + int byteSizeStart = 0; + int listStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the list + // which is a integer and takes four bytes + byteSizeStart = byteStream.getLength(); + byteStream.reserve(4); + listStart = byteStream.getLength(); + } + // 2/ write the size of the list as a VInt + int size = loi.getListLength(obj); + LazyBinaryUtils.writeVInt(byteStream, size); + + // 3/ write the null bytes + byte nullByte = 0; + for (int eid = 0; eid < size; eid++) { + // set the bit to 1 if an element is not null + if (null != loi.getListElement(obj, eid)) { + nullByte |= 1 << (eid % 8); + } + // store the byte every eight elements or + // if this is the last element + if (7 == eid % 8 || eid == size - 1) { + byteStream.write(nullByte); + nullByte = 0; + } + } + + // 4/ write element by element from the list + for (int eid = 0; eid < size; eid++) { + elementSerializer.serializeValue( + byteStream, loi.getListElement(obj, eid), eoi, + false, warnedOnceNullMapKey); + } + + if (!skipLengthPrefix) { + // 5/ update the list byte size + int listEnd = byteStream.getLength(); + int listSize = listEnd - listStart; + writeSizeAtOffset(byteStream, byteSizeStart, listSize); + } + return; + } + } + + static class LBMapSerializer extends LBSerializer { + LBSerializer keySerializer; + LBSerializer valSerializer; + + public LBMapSerializer(LBSerializer keySerializer, + LBSerializer valSerializer) { + super(); + this.keySerializer = keySerializer; + this.valSerializer = valSerializer; + } + + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map map = moi.getMap(obj); + + int byteSizeStart = 0; + int mapStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the map + // which is a integer and takes four bytes + byteSizeStart = byteStream.getLength(); + byteStream.reserve(4); + mapStart = byteStream.getLength(); + } + + // 2/ write the size of the map which is a VInt + int size = map.size(); + LazyBinaryUtils.writeVInt(byteStream, size); + + // 3/ write the null bytes + int b = 0; + byte nullByte = 0; + for (Map.Entry entry : map.entrySet()) { + // set the bit to 1 if a key is not null + if (null != entry.getKey()) { + nullByte |= 1 << (b % 8); + } else if (warnedOnceNullMapKey != null) { + if (!warnedOnceNullMapKey.value) { + LOG.warn("Null map key encountered! Ignoring similar problems."); + } + warnedOnceNullMapKey.value = true; + } + b++; + // set the bit to 1 if a value is not null + if (null != entry.getValue()) { + nullByte |= 1 << (b % 8); + } + b++; + // write the byte to stream every 4 key-value pairs + // or if this is the last key-value pair + if (0 == b % 8 || b == size * 2) { + byteStream.write(nullByte); + nullByte = 0; + } + } + + // 4/ write key-value pairs one by one + for (Map.Entry entry : map.entrySet()) { + keySerializer.serializeValue(byteStream, entry.getKey(), koi, false, warnedOnceNullMapKey); + valSerializer.serializeValue(byteStream, entry.getValue(), voi, false, warnedOnceNullMapKey); + } + + if (!skipLengthPrefix) { + // 5/ update the byte size of the map + int mapEnd = byteStream.getLength(); + int mapSize = mapEnd - mapStart; + writeSizeAtOffset(byteStream, byteSizeStart, mapSize); + } + return; + } + } + + static class LBStructSerializer extends LBSerializer { + LBSerializer[] serializers; + Object[] fieldData; + + public LBStructSerializer(LBSerializer[] serializers) { + super(); + this.serializers = serializers; + this.fieldData = new Object[serializers.length]; + } + + @Override + void serialize(RandomAccessOutput byteStream, Object obj, ObjectInspector objInspector, + boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey) { + int lasti = 0; + byte nullByte = 0; + int size = serializers.length; + + int byteSizeStart = 0; + int typeStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + byteSizeStart = byteStream.getLength(); + byteStream.reserve(4); + typeStart = byteStream.getLength(); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + for (int i = 0; i < size; ++i) { + StructField structField = fields.get(i); + fieldData[i] = soi.getStructFieldData(obj, structField); + } + + for (int i = 0; i < size; ++i) { + // set bit to 1 if a field is not null + if (null != fieldData[i]) { + nullByte |= 1 << (i % 8); + } + // write the null byte every eight elements or + // if this is the last element and serialize the + // corresponding 8 struct fields at the same time + if (7 == i % 8 || i == size - 1) { + byteStream.write(nullByte); + for (int j = lasti; j <= i; j++) { + serializers[j].serializeValue( + byteStream, fieldData[j], fields.get(j).getFieldObjectInspector(), + false, warnedOnceNullMapKey); + } + lasti = i + 1; + nullByte = 0; + } + } + + if (!skipLengthPrefix) { + // 3/ update the byte size of the struct + int typeEnd = byteStream.getLength(); + int typeSize = typeEnd - typeStart; + writeSizeAtOffset(byteStream, byteSizeStart, typeSize); + } + } + } + + static class LBUnionSerializer extends LBSerializer { + LBSerializer[] unionFieldSerializers; + + public LBUnionSerializer(LBSerializer[] unionFieldSerializers) { + this.unionFieldSerializers = unionFieldSerializers; + } + + @Override + void serialize(RandomAccessOutput byteStream, Object obj, + ObjectInspector objInspector, boolean skipLengthPrefix, + BooleanRef warnedOnceNullMapKey) { + int byteSizeStart = 0; + int typeStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + byteSizeStart = byteStream.getLength(); + byteStream.reserve(4); + typeStart = byteStream.getLength(); + } + + // 2/ serialize the union - tag/value + UnionObjectInspector uoi = (UnionObjectInspector) objInspector; + byte tag = uoi.getTag(obj); + byteStream.write(tag); + unionFieldSerializers[tag].serializeValue( + byteStream, uoi.getField(obj), uoi.getObjectInspectors().get(tag), + false, warnedOnceNullMapKey); + + if (!skipLengthPrefix) { + // 3/ update the byte size of the struct + int typeEnd = byteStream.getLength(); + int typeSize = typeEnd - typeStart; + writeSizeAtOffset(byteStream, byteSizeStart, typeSize); + } + } + } +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java index 13c73be..3e20676 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java @@ -274,14 +274,15 @@ public void testLazyBinaryFastCase( String fieldNames = ObjectInspectorUtils.getFieldNames(rowStructObjectInspector); String fieldTypes = ObjectInspectorUtils.getFieldTypes(rowStructObjectInspector); - AbstractSerDe serde = TestLazyBinarySerDe.getSerDe(fieldNames, fieldTypes); + TestLazyBinarySerDe testLazyBinarySerDe = new TestLazyBinarySerDe(); + AbstractSerDe serde = testLazyBinarySerDe.getSerDe(fieldNames, fieldTypes); AbstractSerDe serde_fewer = null; if (doWriteFewerColumns) { String partialFieldNames = ObjectInspectorUtils.getFieldNames(writeRowStructObjectInspector); String partialFieldTypes = ObjectInspectorUtils.getFieldTypes(writeRowStructObjectInspector); - serde_fewer = TestLazyBinarySerDe.getSerDe(partialFieldNames, partialFieldTypes);; + serde_fewer = testLazyBinarySerDe.getSerDe(partialFieldNames, partialFieldTypes);; } testLazyBinaryFast( @@ -345,4 +346,4 @@ public void testLazyBinaryFastComplexDepthOne() throws Throwable { public void testLazyBinaryFastComplexDepthFour() throws Throwable { testLazyBinaryFast(SerdeRandomRowSource.SupportedTypes.ALL, 4); } -} \ No newline at end of file +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe.java b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe.java index 0cd5736..70a5dd0 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe.java @@ -93,7 +93,7 @@ * @return the initialized LazyBinarySerDe * @throws Throwable */ - protected static AbstractSerDe getSerDe(String fieldNames, String fieldTypes) throws Throwable { + protected AbstractSerDe getSerDe(String fieldNames, String fieldTypes) throws Throwable { Properties schema = new Properties(); schema.setProperty(serdeConstants.LIST_COLUMNS, fieldNames); schema.setProperty(serdeConstants.LIST_COLUMN_TYPES, fieldTypes); diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe2.java b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe2.java new file mode 100644 index 0000000..1a93769 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinarySerDe2.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.lazybinary; + +import java.util.Properties; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeUtils; + + +/** + * TestLazyBinarySerDe2. + * + */ +public class TestLazyBinarySerDe2 extends TestLazyBinarySerDe { + + /** + * Initialize the LazyBinarySerDe. + * + * @param fieldNames + * table field names + * @param fieldTypes + * table field types + * @return the initialized LazyBinarySerDe + * @throws Throwable + */ + @Override + protected AbstractSerDe getSerDe(String fieldNames, String fieldTypes) throws Throwable { + Properties schema = new Properties(); + schema.setProperty(serdeConstants.LIST_COLUMNS, fieldNames); + schema.setProperty(serdeConstants.LIST_COLUMN_TYPES, fieldTypes); + + LazyBinarySerDe2 serde = new LazyBinarySerDe2(); + SerDeUtils.initializeSerDe(serde, new Configuration(), schema, null); + return serde; + } +}