Index: contrib/src/test/results/hbase/hbase_queries.q.out =================================================================== --- contrib/src/test/results/hbase/hbase_queries.q.out (revision 0) +++ contrib/src/test/results/hbase/hbase_queries.q.out (revision 0) @@ -0,0 +1,680 @@ +PREHOOK: query: DROP TABLE hbase_table_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table_1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_table_1(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_table_1(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_table_1 +PREHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB hbase_table_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableOutputFormat + serde: org.apache.hadoop.hive.contrib.hbase.HBaseSerDe + name: hbase_table_1 + + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_table_1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_table_1 +PREHOOK: query: DROP TABLE hbase_table_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table_2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE hbase_table_2(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE LOCATION "/tmp/hbase_table_1" +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE hbase_table_2(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE LOCATION "/tmp/hbase_table_1" +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_table_2 +PREHOOK: query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF src))))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y))) (TOK_LIMIT 20))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x:hbase_table_1 + TableScan + alias: hbase_table_1 + Select Operator + expressions: + expr: key + type: int + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 0 + y:src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: string + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 20 + + +PREHOOK: query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_1 +PREHOOK: Input: default@src +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/786279477/10000 +POSTHOOK: query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_1 +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/786279477/10000 +0 val_0 +0 val_0 +0 val_0 +2 val_2 +4 val_4 +5 val_5 +5 val_5 +5 val_5 +8 val_8 +9 val_9 +10 val_10 +11 val_11 +12 val_12 +12 val_12 +15 val_15 +15 val_15 +17 val_17 +18 val_18 +18 val_18 +19 val_19 +PREHOOK: query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL hbase_table_1) key) 100)))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_2))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL hbase_table_2) key) 120)))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF Y))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + y:hbase_table_2 + TableScan + alias: hbase_table_2 + Filter Operator + predicate: + expr: (key < 120) + type: boolean + Filter Operator + predicate: + expr: (key < 120) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 1 + value expressions: + expr: _col0 + type: int + expr: _col1 + type: string + x:hbase_table_1 + TableScan + alias: hbase_table_1 + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Select Operator + expressions: + expr: key + type: int + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 0 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_2 +PREHOOK: Input: default@hbase_table_1 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1638893747/10000 +POSTHOOK: query: FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.* +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_2 +POSTHOOK: Input: default@hbase_table_1 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1638893747/10000 +103 val_103 +104 val_104 +105 val_105 +111 val_111 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +119 val_119 +PREHOOK: query: DROP TABLE empty_hbase_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE empty_hbase_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE empty_hbase_table(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE empty_hbase_table(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@empty_hbase_table +PREHOOK: query: DROP TABLE empty_normal_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE empty_normal_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE empty_normal_table(key int, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE empty_normal_table(key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@empty_normal_table +PREHOOK: query: select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x +PREHOOK: type: QUERY +PREHOOK: Input: default@empty_hbase_table +PREHOOK: Input: default@empty_normal_table +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1752645548/10000 +POSTHOOK: query: select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x +POSTHOOK: type: QUERY +POSTHOOK: Input: default@empty_hbase_table +POSTHOOK: Input: default@empty_normal_table +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1752645548/10000 +0 +0 +PREHOOK: query: select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x +PREHOOK: type: QUERY +PREHOOK: Input: default@empty_normal_table +PREHOOK: Input: default@hbase_table_1 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/358076703/10000 +POSTHOOK: query: select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x +POSTHOOK: type: QUERY +POSTHOOK: Input: default@empty_normal_table +POSTHOOK: Input: default@hbase_table_1 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/358076703/10000 +309 +0 +PREHOOK: query: select * from (select count(1) from src union all select count(1) from empty_hbase_table) x +PREHOOK: type: QUERY +PREHOOK: Input: default@empty_hbase_table +PREHOOK: Input: default@src +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1300110294/10000 +POSTHOOK: query: select * from (select count(1) from src union all select count(1) from empty_hbase_table) x +POSTHOOK: type: QUERY +POSTHOOK: Input: default@empty_hbase_table +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1300110294/10000 +500 +0 +PREHOOK: query: select * from (select count(1) from src union all select count(1) from hbase_table_1) x +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@hbase_table_1 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1689322609/10000 +POSTHOOK: query: select * from (select count(1) from src union all select count(1) from hbase_table_1) x +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@hbase_table_1 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/1689322609/10000 +309 +500 +PREHOOK: query: CREATE TABLE hbase_table_3(key int, value string, count int) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string,cf2:int" +) STORED AS HBASETABLE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_table_3(key int, value string, count int) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string,cf2:int" +) STORED AS HBASETABLE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_table_3 +PREHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_table_1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF hbase_table_1))))) x) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL src) key)) (TOK_SELEXPR (TOK_FUNCTION count (. (TOK_TABLE_OR_COL src) key)) count)) (TOK_GROUPBY (. (TOK_TABLE_OR_COL src) key)))) Y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL Y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB hbase_table_3)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL Y) count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + y:src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count(key) + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + x:hbase_table_1 + TableScan + alias: hbase_table_1 + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 0 + value expressions: + expr: _col0 + type: int + expr: _col1 + type: string + $INTNAME + Reduce Output Operator + key expressions: + expr: UDFToDouble(_col0) + type: double + sort order: + + Map-reduce partition columns: + expr: UDFToDouble(_col0) + type: double + tag: 1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col1} + outputColumnNames: _col0, _col1, _col3 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: UDFToInteger(_col2) + type: int + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableOutputFormat + serde: org.apache.hadoop.hive.contrib.hbase.HBaseSerDe + name: hbase_table_3 + + +PREHOOK: query: INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_1 +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_table_3 +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_1 +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_table_3 +PREHOOK: query: select count(1) from hbase_table_3 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_3 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/2005915014/10000 +POSTHOOK: query: select count(1) from hbase_table_3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_3 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/2005915014/10000 +309 +PREHOOK: query: select * from hbase_table_3 limit 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_3 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/457905763/10000 +POSTHOOK: query: select * from hbase_table_3 limit 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_3 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/457905763/10000 +0 val_0 3 +10 val_10 1 +100 val_100 2 +103 val_103 2 +104 val_104 2 +PREHOOK: query: select key, count from hbase_table_3 limit 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table_3 +PREHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/808434942/10000 +POSTHOOK: query: select key, count from hbase_table_3 limit 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table_3 +POSTHOOK: Output: file:/home/hustlmsp/projects/hive-trunk-new/build/ql/tmp/808434942/10000 +0 3 +10 1 +100 2 +103 2 +104 2 +PREHOOK: query: DROP TABLE hbase_table_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table_1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@hbase_table_1 +PREHOOK: query: DROP TABLE hbase_table_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@hbase_table_2 +PREHOOK: query: DROP TABLE hbase_table_3 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table_3 +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@hbase_table_3 +PREHOOK: query: DROP TABLE empty_hbase_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE empty_hbase_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@empty_hbase_table +PREHOOK: query: DROP TABLE empty_normal_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE empty_normal_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@empty_normal_table Index: contrib/src/test/org/apache/hadoop/hive/contrib/hbase/HBaseQTestUtil.java =================================================================== --- contrib/src/test/org/apache/hadoop/hive/contrib/hbase/HBaseQTestUtil.java (revision 0) +++ contrib/src/test/org/apache/hadoop/hive/contrib/hbase/HBaseQTestUtil.java (revision 0) @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.mapred.JobConf; + +public class HBaseQTestUtil extends QTestUtil { + + private String tmpdir; + + private MiniHBaseCluster hbase = null; + private final int NUM_REGIONSERVERS = 1; + + public HBaseQTestUtil(String outDir, String logDir, boolean miniMr) throws Exception { + super(outDir, logDir, miniMr); + } + + protected void preTestUtilInit() throws Exception { + // register hbase metadata handler in metastore + conf.set("hive.othermetadata.handlers", HiveHBaseTableInputFormat.class.getName() + ":" + HBaseMetadataHandler.class.getName()); + // Setup the hbase Cluster + try { + conf.set("hbase.master", "local"); + tmpdir = System.getProperty("user.dir")+"/../build/ql/tmp"; + conf.set("hbase.rootdir", "file://" + tmpdir + "/hbase"); + HBaseConfiguration hbaseConf = new HBaseConfiguration(conf); + hbase = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); + conf.set("hbase.master", hbase.getHMasterAddress().toString()); + // opening the META table ensures that cluster is running + new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME); + } catch (IOException ie) { + ie.printStackTrace(); + if (hbase != null) { + hbase.shutdown(); + } + throw ie; + } + + String auxJars = conf.getAuxJars(); + auxJars = ( auxJars == null ? "" : auxJars + "," ) + "file://" + + new JobConf(conf, HBaseConfiguration.class).getJar(); + auxJars += ",file://" + new JobConf(conf, HBaseSerDe.class).getJar(); + conf.setAuxJars(auxJars); + } + + public void shutdown() throws Exception { + if (hbase != null) { + HConnectionManager.deleteConnectionInfo(new HBaseConfiguration(conf), true); + hbase.shutdown(); + hbase = null; + } + + super.shutdown(); + } + +} Index: contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestHBaseSerDe.java =================================================================== --- contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestHBaseSerDe.java (revision 0) +++ contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestHBaseSerDe.java (revision 0) @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.hbase; + + +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.contrib.hbase.HBaseSerDe; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +public class TestHBaseSerDe extends TestCase { + + /** + * Test the LazySimpleSerDe class. + */ + public void testHBaseSerDe() throws Throwable { + try { + // Create the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createProperties(); + serDe.initialize(conf, tbl); + + byte[] colabyte = "cola:abyte".getBytes(); + byte[] colbshort = "colb:ashort".getBytes(); + byte[] colcint = "colc:aint".getBytes(); + byte[] colalong = "cola:along".getBytes(); + byte[] colbdouble = "colb:adouble".getBytes(); + byte[] colcstring = "colc:astring".getBytes(); + + // Data + HbaseMapWritable cells = new HbaseMapWritable(); + cells.put(colabyte, new Cell("123".getBytes(), 0)); + cells.put(colbshort, new Cell("456".getBytes(), 0)); + cells.put(colcint, new Cell("789".getBytes(), 0)); + cells.put(colalong, new Cell("1000".getBytes(), 0)); + cells.put(colbdouble, new Cell("5.3".getBytes(), 0)); + cells.put(colcstring, new Cell("hive and hadoop".getBytes(), 0)); + RowResult rr = new RowResult("test-row1".getBytes(), cells); + BatchUpdate bu = new BatchUpdate("test-row1".getBytes()); + bu.put(colabyte, "123".getBytes()); + bu.put(colbshort, "456".getBytes()); + bu.put(colcint, "789".getBytes()); + bu.put(colalong, "1000".getBytes()); + bu.put(colbdouble, "5.3".getBytes()); + bu.put(colcstring, "hive and hadoop".getBytes()); + + Object[] expectedFieldsData = { new Text("test-row1"), new ByteWritable((byte)123), + new ShortWritable((short)456), new IntWritable(789), + new LongWritable(1000), new DoubleWritable(5.3), new Text("hive and hadoop") + }; + + deserializeAndSerialize(serDe, rr, bu, expectedFieldsData); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + private void deserializeAndSerialize(HBaseSerDe serDe, RowResult rr, BatchUpdate bu, + Object[] expectedFieldsData) throws SerDeException { + // Get the row structure + StructObjectInspector oi = (StructObjectInspector)serDe.getObjectInspector(); + List fieldRefs = oi.getAllStructFieldRefs(); + assertEquals(7, fieldRefs.size()); + + // Deserialize + Object row = serDe.deserialize(rr); + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + if (fieldData != null) { + fieldData = ((LazyPrimitive)fieldData).getWritableObject(); + } + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + // Serialize + assertEquals(BatchUpdate.class, serDe.getSerializedClass()); + BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi); + assertEquals("Serialized data", bu.toString(), serializedBU.toString()); + } + + private Properties createProperties() { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9"); + tbl.setProperty("columns", + "key,abyte,ashort,aint,along,adouble,astring"); + tbl.setProperty("columns.types", + "string,tinyint:smallint:int:bigint:double:string"); + tbl.setProperty(HBaseSerDe.HBASE_SCHEMA_MAPPING, + "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring"); + return tbl; + } + +} Index: contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestLazyHBaseObject.java =================================================================== --- contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestLazyHBaseObject.java (revision 0) +++ contrib/src/test/org/apache/hadoop/hive/contrib/hbase/TestLazyHBaseObject.java (revision 0) @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.contrib.hbase.LazyHBaseCellMap; +import org.apache.hadoop.hive.contrib.hbase.LazyHBaseRow; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyString; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +public class TestLazyHBaseObject extends TestCase { + + /** + * Test the LazyMap class. + */ + public void testLazyHBaseCellMap() throws Throwable { + try { + { + // Map of Integer to String + Text nullSequence = new Text("\\N"); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map").get(0), + new byte[]{(byte)1, (byte)2}, 0, nullSequence, false, (byte)0); + + LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); + + // Intial a row result + HbaseMapWritable cells = new HbaseMapWritable(); + cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0)); + cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0)); + cells.put("cfb:2".getBytes(), new Cell("def".getBytes(), 0)); + cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0)); + cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0)); + cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0)); + + RowResult rr = new RowResult("test-row".getBytes(), cells); + + b.init(rr, "cfb:"); + + assertEquals(new Text("def"), ((LazyString)b.getMapValueElement(new IntWritable(2))).getWritableObject()); + assertNull(b.getMapValueElement(new IntWritable(-1))); + assertEquals(new Text("0"), ((LazyString)b.getMapValueElement(new IntWritable(0))).getWritableObject()); + assertEquals(new Text("abc"), ((LazyString)b.getMapValueElement(new IntWritable(8))).getWritableObject()); + assertNull(b.getMapValueElement(new IntWritable(12345))); + + assertEquals("{0:'0',2:'def',8:'abc'}".replace('\'', '\"'), + SerDeUtils.getJSONString(b, oi)); + } + + { + // Map of String to String + Text nullSequence = new Text("\\N"); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map").get(0), + new byte[]{(byte)'#', (byte)'\t'}, 0, nullSequence, false, (byte)0); + + LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); + + // Intial a row result + HbaseMapWritable cells = new HbaseMapWritable(); + cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0)); + cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0)); + cells.put("cfb:2".getBytes(), new Cell("d\tf".getBytes(), 0)); + cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0)); + cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0)); + cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0)); + + RowResult rr = new RowResult("test-row".getBytes(), cells); + + b.init(rr, "cfb:"); + + assertEquals(new Text("d\tf"), ((LazyString)b.getMapValueElement(new Text("2"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-1"))); + assertEquals(new Text("0"), ((LazyString)b.getMapValueElement(new Text("0"))).getWritableObject()); + assertEquals(new Text("abc"), ((LazyString)b.getMapValueElement(new Text("8"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-"))); + + assertEquals("{'0':'0','2':'d\\tf','8':'abc'}".replace('\'', '\"'), + SerDeUtils.getJSONString(b, oi)); + } + + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + /** + * Test the LazyHBaseRow class. + */ + public void testLazyHBaseRow() throws Throwable { + try { + { + ArrayList fieldTypeInfos = + TypeInfoUtils.getTypeInfosFromTypeString("string,int,array,map,string"); + List fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"}); + Text nullSequence = new Text("\\N"); + + List hbaseColumnNames = + Arrays.asList(new String[]{"cfa:a", "cfa:b", "cfb:c", "cfb:d"}); + + ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames, + fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte)0); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + + HbaseMapWritable cells = new HbaseMapWritable(); + + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("hi".getBytes(), 0)); + RowResult rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':['a','b','c'],'c':{'d':'e','f':'g'},'d':'hi'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':null,'c':{'d':'e','f':'g'},'d':null}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("d=\\N:f=g:h".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['a'],'c':{'d':null,'f':'g','h':null},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['','a','',''],'c':null,'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:c".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + } + + // column family is mapped to Map + { + ArrayList fieldTypeInfos = + TypeInfoUtils.getTypeInfosFromTypeString("string,int,array,map,string"); + List fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"}); + Text nullSequence = new Text("\\N"); + + List hbaseColumnNames = + Arrays.asList(new String[]{"cfa:a", "cfa:b", "cfb:", "cfc:d"}); + + ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames, + fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte)0); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + + HbaseMapWritable cells = new HbaseMapWritable(); + + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("hi".getBytes(), 0)); + RowResult rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':['a','b','c'],'c':{'d':'e','f':'g'},'d':'hi'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':null,'c':{'d':'e','f':'g'},'d':null}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0)); + cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['a'],'c':{'f':'g'},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':null,'b':['','a','',''],'c':{},'d':'no'}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + + cells.clear(); + cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0)); + cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0)); + cells.put("cfc:d".getBytes(), new Cell("".getBytes(), 0)); + rr = new RowResult("test-row".getBytes(), cells); + o.init(rr, hbaseColumnNames); + assertEquals("{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), + SerDeUtils.getJSONString(o, oi)); + } + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + +} Index: contrib/src/test/queries/hbase/hbase_queries.q =================================================================== --- contrib/src/test/queries/hbase/hbase_queries.q (revision 0) +++ contrib/src/test/queries/hbase/hbase_queries.q (revision 0) @@ -0,0 +1,92 @@ +DROP TABLE hbase_table_1; +CREATE TABLE hbase_table_1(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE; + +EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *; +FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *; + +DROP TABLE hbase_table_2; +CREATE EXTERNAL TABLE hbase_table_2(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE LOCATION "/tmp/hbase_table_1"; + +EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20; + +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.* FROM src) Y +ON (x.key = Y.key) +SELECT Y.* LIMIT 20; + +EXPLAIN FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.*; + +FROM +(SELECT hbase_table_1.* FROM hbase_table_1 WHERE hbase_table_1.key > 100) x +JOIN +(SELECT hbase_table_2.* FROM hbase_table_2 WHERE hbase_table_2.key < 120) Y +ON (x.key = Y.key) +SELECT Y.*; + +DROP TABLE empty_hbase_table; +CREATE TABLE empty_hbase_table(key int, value string) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string" +) STORED AS HBASETABLE; + +DROP TABLE empty_normal_table; +CREATE TABLE empty_normal_table(key int, value string); + +select * from (select count(1) from empty_normal_table union all select count(1) from empty_hbase_table) x; +select * from (select count(1) from empty_normal_table union all select count(1) from hbase_table_1) x; +select * from (select count(1) from src union all select count(1) from empty_hbase_table) x; +select * from (select count(1) from src union all select count(1) from hbase_table_1) x; + +CREATE TABLE hbase_table_3(key int, value string, count int) +ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.hbase.HBaseSerDe' +WITH SERDEPROPERTIES ( +"hbase.columns.mapping" = "cf:string,cf2:int" +) STORED AS HBASETABLE; + +EXPLAIN +INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key); + +INSERT OVERWRITE TABLE hbase_table_3 +SELECT x.key, x.value, Y.count +FROM +(SELECT hbase_table_1.* FROM hbase_table_1) x +JOIN +(SELECT src.key, count(src.key) as count FROM src GROUP BY src.key) Y +ON (x.key = Y.key); + +select count(1) from hbase_table_3; +select * from hbase_table_3 limit 5; +select key, count from hbase_table_3 limit 5; + +DROP TABLE hbase_table_1; +DROP TABLE hbase_table_2; +DROP TABLE hbase_table_3; +DROP TABLE empty_hbase_table; +DROP TABLE empty_normal_table; Index: contrib/src/test/templates/TestHBaseCliDriver.vm =================================================================== --- contrib/src/test/templates/TestHBaseCliDriver.vm (revision 0) +++ contrib/src/test/templates/TestHBaseCliDriver.vm (revision 0) @@ -0,0 +1,123 @@ +package org.apache.hadoop.hive.cli; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.hive.contrib.hbase.HBaseQTestUtil; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + + + +import org.antlr.runtime.*; +import org.antlr.runtime.tree.*; + +public class $className extends TestCase { + + private HBaseQTestUtil qt; + + public $className(String name) { + super(name); + qt = null; + } + + @Override + protected void setUp() { + try { + boolean miniMR = false; + if ("$clusterMode".equals("miniMR")) + miniMR = true; + + qt = new HBaseQTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR); + +#foreach ($qf in $qfiles) + qt.addFile("$qf.getCanonicalPath()"); +#end + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + suite.addTest(new $className("testCliDriver_$tname")); +#end + return suite; + } + +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + public void testCliDriver_$tname() throws Exception { + try { + System.out.println("Begin query: " + "$fname"); + qt.cliInit("$fname"); + int ecode = qt.executeClient("$fname"); + if (ecode != 0) { + fail("Client Execution failed with error code = " + ecode); + } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + + if(jobInfoMap.size() != 0) { + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + } + + ecode = qt.checkCliDriverResults("$fname"); + if (ecode != 0) { + fail("Client execution results failed with error code = " + ecode); + } + } + catch (Throwable e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception"); + } + + System.out.println("Done query: " + "$fname"); + assertTrue("Test passed", true); + } + +#end +} + Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HiveHBaseTableInputFormat.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HiveHBaseTableInputFormat.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HiveHBaseTableInputFormat.java (revision 0) @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveNotFileInputFormat; +import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class HiveHBaseTableInputFormat + implements HiveNotFileInputFormat, JobConfigurable { + + final static Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); + + public static class HBaseSplit extends FileSplit implements InputSplit { + String hbaseColumnMapping; + TableSplit split; + + public HBaseSplit() { + super((Path)null, 0, 0, (String[])null); + hbaseColumnMapping = ""; + split = new TableSplit(); + } + + public HBaseSplit(TableSplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, + (String[])null); + this.split = split; + hbaseColumnMapping = columnsMapping; + } + + public TableSplit getSplit() { + return this.split; + } + + public String getColumnsMapping() { + return this.hbaseColumnMapping; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + hbaseColumnMapping = in.readUTF(); + split.readFields(in); + } + + @Override + public String toString() { + return "TableSplit " + split + " : " + hbaseColumnMapping; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(hbaseColumnMapping); + split.write(out); + } + + @Override + public long getLength() { + return split.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + } + + static class TableInputFormat extends org.apache.hadoop.hbase.mapred.TableInputFormatBase + implements JobConfigurable { + + @Override + public void configure(JobConf job) {} + + public void setScanColumns(byte[][] scanColumns) { + setInputColumns(scanColumns); + } + + public void setHBaseTable(HTable table) { + setHTable(table); + } + + } + + TableInputFormat mInputFormat; + JobConf mJob; + + public HiveHBaseTableInputFormat() { + mInputFormat = new TableInputFormat(); + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + HBaseSplit hbaseSplit = (HBaseSplit)split; + + mInputFormat.setHBaseTable(new HTable(new HBaseConfiguration(job), hbaseSplit.getSplit().getTableName())); + + // tricky here! + // because the hbase key is mapped to the first column in its hive table + // we add the "_key" before the columnMapping that we can use the hive column id + // to find the exact hbase column. + String columnMapping = "_key," + hbaseSplit.getColumnsMapping(); + String[] columns = columnMapping.split(","); + ArrayList readColIDs = ColumnProjectionUtils.getReadColumnIDs(job); + + if (columns.length < readColIDs.size()) { + throw new IOException("Try to read more columns than the given table. Fail!"); + } + + byte[][] scanColumns; + if (readColIDs.size() == 0) { + scanColumns = new byte[columns.length - 1][]; + for (int i=0; i) mInputFormat.getRecordReader(hbaseSplit.getSplit(), job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + Path[] tableNames = FileInputFormat.getInputPaths(job); + mInputFormat.setHBaseTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); + + String hbaseSchemaMapping = job.get(HBaseSerDe.HBASE_SCHEMA_MAPPING); + if (hbaseSchemaMapping == null) { + throw new IOException("Schema Mapping is null in a HBase Table."); + } + + // the setting is mocked. + String[] columns = hbaseSchemaMapping.split(","); + byte[][] inputColumns = new byte[columns.length][]; + for(int i=0; i { + + ImmutableBytesWritable KEY = new ImmutableBytesWritable(); + + /** + * update to the final out table, and output an empty key as the key + * + * @param jc + * the job configuration file + * @param finalOutPath + * the final output table name + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableInfo of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter for the output file + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + jc.set(TableOutputFormat.OUTPUT_TABLE, finalOutPath.getName()); + + final org.apache.hadoop.mapred.RecordWriter tblWriter = + this.getRecordWriter(null, jc, null, progress); + return new RecordWriter() { + + @Override + public void close(boolean abort) throws IOException { + tblWriter.close(null); + } + + @Override + public void write(Writable w) throws IOException { + BatchUpdate bu = (BatchUpdate)w; + KEY.set(bu.getRow()); + tblWriter.write(KEY, bu); + } + + }; + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseCellMap.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseCellMap.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseCellMap.java (revision 0) @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.hbase; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +public class LazyHBaseCellMap extends LazyMap { + + RowResult rowResult; + String hbaseColumnFamily; + + /** + * Construct a LazyCellMap object with the ObjectInspector. + * @param oi + */ + public LazyHBaseCellMap(LazyMapObjectInspector oi) { + super(oi); + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + // do nothing + } + + public void init(RowResult rr, String columnFamily) { + rowResult = rr; + hbaseColumnFamily = columnFamily; + parsed = false; + } + + private void parse() { + if(cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + Iterator iter = rowResult.keySet().iterator(); + + byte[] columnFamily = hbaseColumnFamily.getBytes(); + while(iter.hasNext()) { + byte[] columnKey = iter.next(); + if(columnFamily.length > columnKey.length) + continue; + + if(0 == LazyUtils.compare(columnFamily, 0, columnFamily.length, + columnKey, 0, columnFamily.length)) { + byte[] columnValue = rowResult.get(columnKey).getValue(); + if(columnValue == null || columnValue.length == 0) // a empty object + continue; + + // Keys are always primitive + LazyPrimitive key = LazyFactory.createLazyPrimitiveClass( + (PrimitiveObjectInspector)((MapObjectInspector)oi).getMapKeyObjectInspector()); + ByteArrayRef keyRef = new ByteArrayRef(); + keyRef.setData(columnKey); + key.init(keyRef, columnFamily.length, columnKey.length - columnFamily.length); + + // Value + LazyObject value = LazyFactory.createLazyObject( + ((MapObjectInspector)oi).getMapValueObjectInspector()); + ByteArrayRef valueRef = new ByteArrayRef(); + valueRef.setData(columnValue); + value.init(valueRef, 0, columnValue.length); + + // Put it onto the map + cachedMap.put(key.getObject(), value.getObject()); + } + } + } + + /** + * Get the value in the map for the given key. + * + * + * + * @param key + * @return + */ + public Object getMapValueElement(Object key) { + if (!parsed) { + parse(); + } + + for(Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive)entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI == null) continue; + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject)entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + public Map getMap() { + if (!parsed) { + parse(); + } + return cachedMap; + } + + public int getMapSize() { + if (!parsed) { + parse(); + } + return cachedMap.size(); + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseMetadataHandler.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseMetadataHandler.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseMetadataHandler.java (revision 0) @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.HiveOtherMetadataHandlerManager.HiveOtherMetadataHandler; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.StringUtils; + +public class HBaseMetadataHandler implements HiveOtherMetadataHandler { + + HBaseConfiguration hbaseConf; + HBaseAdmin admin = null; + + private HBaseAdmin getHBaseAdmin() throws MetaException { + try { + if (admin == null) { + admin = new HBaseAdmin(hbaseConf); + } + return admin; + } catch (MasterNotRunningException mnre) { + throw new MetaException(StringUtils.stringifyException(mnre)); + } + } + + @Override + public void post_dropPartition(Table tbl, Partition part, boolean deleteData, + boolean isExternal) throws MetaException { + throw new MetaException("Unable to drop a partition in a hbased hive table. Because we haven't any partitions in hbased hive table now."); + } + + @Override + public void post_dropTable(Table table, boolean deleteData, boolean isExternal) + throws MetaException { + try { + if (!isExternal && deleteData) { + if (getHBaseAdmin().isTableEnabled(table.getTableName())) { + getHBaseAdmin().disableTable(table.getTableName()); + } + getHBaseAdmin().deleteTable(table.getTableName()); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void pre_addPartition(Partition part) throws MetaException { + throw new MetaException("Unable to create any partition in a hbased hive table. Because we haven't any partitions in hbased hive table now."); + } + + @Override + public void pre_alterPartition(Table tbl, Partition newPart) + throws MetaException { + throw new MetaException("Unable to alter a partition in a hbased hive table. Because we haven't any partitions in hbased hive table now."); + } + + @Override + public void pre_alterTable(Table oldTable, Table newTable) + throws MetaException { + if (oldTable == null || newTable == null) { + throw new MetaException("Either new/old table is invalid : " + newTable + ", " + oldTable); + } + + if(!MetaStoreUtils.validateName(newTable.getTableName()) || + !MetaStoreUtils.validateColNames(newTable.getSd().getCols())) { + throw new MetaException(newTable.getTableName() + " is not a valid object name"); + } + + boolean rename = false; + + // check if table with the new name already exists + if (!newTable.getTableName().equalsIgnoreCase(oldTable.getTableName()) + || !newTable.getDbName().equalsIgnoreCase(oldTable.getDbName())) { + rename = true; + } + + if (rename) + throw new MetaException("Uable to rename a table. Because we don't know how to rename a hbase table."); + + throw new MetaException("Uable to alter a hbased hive table now."); + + } + + @Override + public void pre_createTable(Table tbl, boolean isExternal) throws MetaException { + try { + String tblName = tbl.getSd().getLocation(); + if (tblName == null) { + tblName = tbl.getTableName(); + } else { + tblName = new Path(tblName).getName(); + } + + // Build the mapping schema + Set columnFamilies = new HashSet(); + // Check the hbase columns and get all the families + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_SCHEMA_MAPPING); + if (hbaseColumnStr == null) + throw new MetaException("No schema mapping defined in Serde."); + String[] hbaseColumns = hbaseColumnStr.split(","); + for(String hbaseColumn : hbaseColumns) { + int idx = hbaseColumn.indexOf(":"); + if (idx < 0) + throw new MetaException(hbaseColumn + " is not a qualified hbase column."); + columnFamilies.add(hbaseColumn.substring(0, idx + 1)); + } + + // Check if the given hbase table existes + HTableDescriptor tblDesc; + + if (!getHBaseAdmin().tableExists(tblName)) { + // it is not a external table create one + if (!isExternal) { + // Create the all column descriptors + tblDesc = new HTableDescriptor(tblName); + for (String cf : columnFamilies) { + tblDesc.addFamily(new HColumnDescriptor(cf)); + } + + getHBaseAdmin().createTable(tblDesc); + } else { // an external table + throw new MetaException("HBase table " + tblName + + " doesn't exist while the table is declared as an external table."); + } + + } else { // make sure the schema mapping is right + tblDesc = getHBaseAdmin().getTableDescriptor(tblName); + for (String cf : columnFamilies) { + if(!tblDesc.hasFamily(Bytes.toBytes(cf))) + throw new MetaException("Column Family " + cf + " is not defined in hbase table " + tblName); + } + } + // ensure the table is online + new HTable(hbaseConf, tblDesc.getName()); + } catch (MasterNotRunningException mnre) { + throw new MetaException(StringUtils.stringifyException(mnre)); + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void rollback_addPartition(Partition part) throws MetaException { + // do nothing now + } + + @Override + public void rollback_alterPartition(Table tbl, Partition newPart) + throws MetaException { + // do nothing now + } + + @Override + public void rollback_alterTable(Table oldTable, Table newTable) + throws MetaException { + // do nothing now + } + + @Override + public void rollback_createTable(Table table, boolean isExternal) throws MetaException { + try { + if (!isExternal && getHBaseAdmin().tableExists(table.getTableName())) { + // we have create an hbase table, so we delete it to roll back; + if (getHBaseAdmin().isTableEnabled(table.getTableName())) + getHBaseAdmin().disableTable(table.getTableName()); + getHBaseAdmin().deleteTable(table.getTableName()); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public Configuration getConf() { + return hbaseConf; + } + + @Override + public void setConf(Configuration conf) { + hbaseConf = new HBaseConfiguration(conf); + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java (revision 0) @@ -0,0 +1,457 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * HBaseSerDe can be used to serialize object into hbase table and + * deserialize object from hbase table. + */ +public class HBaseSerDe implements SerDe { + + public static final String HBASE_SCHEMA_MAPPING = "hbase.columns.mapping"; + + public static final Log LOG = LogFactory.getLog( + HBaseSerDe.class.getName()); + + public static class HBaseSerDeParameters { + List hbaseColumnNames; + SerDeParameters serdeParams; + + public List getHBaseColumnNames() { + return hbaseColumnNames; + } + + public SerDeParameters getSerDeParameters() { + return serdeParams; + } + } + + private ObjectInspector mCachedObjectInspector; + HBaseSerDeParameters mHBaseSerDeParameters = null; + private boolean useJSONSerialize; // use json to serialize + + public String toString() { + return getClass().toString() + + "[" + + mHBaseSerDeParameters.hbaseColumnNames + + ":" + + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()) + .getAllStructFieldNames() + + ":" + + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()) + .getAllStructFieldTypeInfos() + "]"; + } + + public HBaseSerDe() throws SerDeException { + } + + /** + * Initialize the SerDe given parameters. + * @see SerDe#initialize(Configuration, Properties) + */ + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + mHBaseSerDeParameters = HBaseSerDe.initHBaseSerDeParameters(conf, tbl, + getClass().getName()); + + // We just used columnNames & columnTypes these two parameters + mCachedObjectInspector = LazyFactory.createLazyStructInspector( + mHBaseSerDeParameters.serdeParams.getColumnNames(), + mHBaseSerDeParameters.serdeParams.getColumnTypes(), + mHBaseSerDeParameters.serdeParams.getSeparators(), + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isLastColumnTakesRest(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar()); + + mCachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector)mCachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("HBaseSerDe initialized with : columnNames = " + + mHBaseSerDeParameters.serdeParams.getColumnNames() + " columnTypes = " + + mHBaseSerDeParameters.serdeParams.getColumnTypes() + " hbaseColumnMapping = " + + mHBaseSerDeParameters.hbaseColumnNames); + } + } + + public static HBaseSerDeParameters initHBaseSerDeParameters( + Configuration job, Properties tbl, String serdeName) + throws SerDeException { + HBaseSerDeParameters serdeParams = new HBaseSerDeParameters(); + + // Read Configuration Parameter + String hbaseColumnNameProperty = tbl.getProperty(HBaseSerDe.HBASE_SCHEMA_MAPPING); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // Initial the hbase column list + if (hbaseColumnNameProperty != null && hbaseColumnNameProperty.length() > 0) { + serdeParams.hbaseColumnNames = Arrays.asList(hbaseColumnNameProperty.split(",")); + } else { + serdeParams.hbaseColumnNames = new ArrayList(); + } + + // Add the hbase key to the columnNameList and columnTypeList + + // Build the type property string + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + sb.append(Constants.STRING_TYPE_NAME); + + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String colName = serdeParams.hbaseColumnNames.get(i); + if(colName.endsWith(":")) + sb.append(":").append(Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + "," + Constants.STRING_TYPE_NAME + ">"); + else + sb.append(":").append(Constants.STRING_TYPE_NAME); + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + + serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); + + if (serdeParams.hbaseColumnNames.size() + 1 != serdeParams.serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + serdeParams.hbaseColumnNames.size() + " elements!"); + } + + // check the mapping schema is right? + // we just can make sure that "columnfamily:" is mapped to MAP + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String hbaseColName = serdeParams.hbaseColumnNames.get(i); + if(hbaseColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i+1); + if(typeInfo.getCategory() == Category.MAP && + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName() != Constants.STRING_TYPE_NAME) { + throw new SerDeException(serdeName + ": hbase column family '" + + hbaseColName + "' should be mapped to Map while being mapped to " + + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName()); + } + } + } + + return serdeParams; + } + + // The object for storing hbase row data. + LazyHBaseRow mCachedHBaseRow; + + /** + * Deserialize a row from the HBase RowResult writable to a LazyObject + * @param rowResult the HBase RowResult Writable contain a row + * @return the deserialized object + * @see SerDe#deserialize(Writable) + */ + public Object deserialize(Writable rowResult) throws SerDeException { + + if (!(rowResult instanceof RowResult)) { + throw new SerDeException(getClass().getName() + ": expects RowResult!"); + } + + RowResult rr = (RowResult)rowResult; + mCachedHBaseRow.init(rr, mHBaseSerDeParameters.hbaseColumnNames); + return mCachedHBaseRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return mCachedObjectInspector; + } + + BatchUpdate serializeCache = null; + ByteStream.Output serializeStream = new ByteStream.Output(); + + @Override + public Class getSerializedClass() { + return BatchUpdate.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) + throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (mHBaseSerDeParameters.serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo()).getAllStructFieldNames().size()>0)? + ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs(): null; + + boolean isNotNull = false; + String hbaseColumn = ""; + + try { + // Serialize each field + for (int i=0; i= declaredFields.size()) { + throw new SerDeException( + "Error: expecting " + declaredFields.size() + + " but asking for field " + i + "\n" + "data=" + obj + "\n" + + "tableType=" + mHBaseSerDeParameters.serdeParams.getRowTypeInfo().toString() + "\n" + + "dataType=" + + TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector)); + } + + if (f == null) // a null object, we do not serialize it + continue; + + if (i > 0) + hbaseColumn = mHBaseSerDeParameters.hbaseColumnNames.get(i-1); + + // If the field that is column family in hbase + if(i > 0 && hbaseColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector)foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + continue; + } else { + for (Map.Entry entry: map.entrySet()) { + // Get the Key + serialize(serializeStream, entry.getKey(), koi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 3, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + + // generate a column name (column_family:column_name) + hbaseColumn += Bytes.toString(serializeStream.getData()); + + // Get the Value + serializeStream.reset(); + + isNotNull = serialize(serializeStream, entry.getValue(), voi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 3, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } + } + } else { + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize(serializeStream, SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + mHBaseSerDeParameters.serdeParams.getSeparators(), 1, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } else { + isNotNull = serialize(serializeStream, f, foi, + mHBaseSerDeParameters.serdeParams.getSeparators(), 1, + mHBaseSerDeParameters.serdeParams.getNullSequence(), + mHBaseSerDeParameters.serdeParams.isEscaped(), + mHBaseSerDeParameters.serdeParams.getEscapeChar(), + mHBaseSerDeParameters.serdeParams.getNeedsEscape()); + } + } + + byte[] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i==0) { // the first column is the hbase key + serializeCache = new BatchUpdate(key); + } else { + if(isNotNull) + serializeCache.put(hbaseColumn, key); + } + } + } catch (IOException e) { + throw new SerDeException(e); + } + + return serializeCache; + } + + /** + * Serialize the row into the StringBuilder. + * @param out The StringBuilder to store the serialized data. + * @param obj The object for the current field. + * @param objInspector The ObjectInspector for the current Object. + * @param separators The separators array. + * @param level The current level of separator. + * @param nullSequence The byte sequence representing the NULL value. + * @param escaped Whether we need to escape the data when writing out + * @param escapeChar Which char to use as the escape char, e.g. '\\' + * @param needsEscape Which chars needs to be escaped. This array should have size of 128. + * Negative byte values (or byte values >= 128) are never escaped. + * @throws IOException + * @return true, if serialize a not-null object; otherwise false. + * + * Note: Copy From LazySimpleSerDe. There is a little difference, that we do not serialize a null + * object to hbase. + */ + public static boolean serialize(ByteStream.Output out, Object obj, + ObjectInspector objInspector, byte[] separators, int level, + Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8(out, obj, (PrimitiveObjectInspector)objInspector, escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char)separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i=0; i0) { + out.write(separator); + } + serialize(out, list.get(i), eoi, separators, level+1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case MAP: { + char separator = (char)separators[level]; + char keyValueSeparator = (char)separators[level+1]; + MapObjectInspector moi = (MapObjectInspector)objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + out.write(separator); + } + serialize(out, entry.getKey(), koi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + out.write(keyValueSeparator); + serialize(out, entry.getValue(), voi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case STRUCT: { + char separator = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i=0; i0) { + out.write(separator); + } + serialize(out, list.get(i), + fields.get(i).getFieldObjectInspector(), separators, level+1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + } + + throw new RuntimeException("Unknown category type: " + + objInspector.getCategory()); + } + + + /** + * @return the useJSONSerialize + */ + public boolean isUseJSONSerialize() { + return useJSONSerialize; + } + + /** + * @param useJSONSerialize the useJSONSerialize to set + */ + public void setUseJSONSerialize(boolean useJSONSerialize) { + this.useJSONSerialize = useJSONSerialize; + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseRow.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseRow.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/LazyHBaseRow.java (revision 0) @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +/** + * LazyObject for storing a hbase row. + * The field of a hbase row can be primitive or non-primitive. + */ +public class LazyHBaseRow extends LazyStruct { + + /** + * The hbase columns mapping of the hbase row. + */ + List hbaseColumns; + RowResult mRowResult; + + /** + * Construct a LazyHBaseRow object with the ObjectInspector. + */ + public LazyHBaseRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + /** + * Set the hbase row data(a RowResult writable) for this LazyStruct. + * @see LazyHBaseRow#init(RowResult) + */ + public void init(RowResult rr, List hbaseColumns) { + this.mRowResult = rr; + this.hbaseColumns = hbaseColumns; + parsed = false; + } + + /** + * Parse the RowResult and fill each field. + * @see LazyStruct#parse() + */ + private void parse() { + if (fields == null) { + List fieldRefs = ((StructObjectInspector)oi).getAllStructFieldRefs(); + fields = new LazyObject[fieldRefs.size()]; + for (int i = 0 ; i < fields.length; i++) { + if (i > 0) { + String hbaseColumn = hbaseColumns.get(i - 1); + if (hbaseColumn.endsWith(":")) { // a column family + fields[i] = + new LazyHBaseCellMap((LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector()); + continue; + } + } + + fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector()); + } + fieldInited = new boolean[fields.length]; + } + Arrays.fill(fieldInited, false); + parsed = true; + } + + /** + * Get one field out of the hbase row. + * + * If the field is a primitive field, return the actual object. + * Otherwise return the LazyObject. This is because PrimitiveObjectInspector + * does not have control over the object used by the user - the user simply + * directly use the Object instead of going through + * Object PrimitiveObjectInspector.get(Object). + * + * @param fieldID The field ID + * @return The field as a LazyObject + */ + public Object getField(int fieldID) { + if (!parsed) { + parse(); + } + return uncheckedGetField(fieldID); + } + + /** + * Get the field out of the row without checking parsed. + * This is called by both getField and getFieldsAsList. + * @param fieldID The id of the field starting from 0. + * @param nullSequence The sequence representing NULL value. + * @return The value of the field + */ + private Object uncheckedGetField(int fieldID) { + if (!fieldInited[fieldID]) { + fieldInited[fieldID] = true; + + ByteArrayRef ref = new ByteArrayRef(); + + if(fieldID == 0) { // the key + ref.setData(mRowResult.getRow()); + fields[fieldID].init(ref, 0, ref.getData().length); + } else { + String columnName = hbaseColumns.get(fieldID - 1); + if(columnName.endsWith(":")) // it is a column family + ((LazyHBaseCellMap)fields[fieldID]).init(mRowResult, columnName); + else { // it is a column + if(mRowResult.containsKey(columnName)) { + ref.setData(mRowResult.get(columnName).getValue()); + fields[fieldID].init(ref, 0, ref.getData().length); + } else { + return null; + } + } + } + } + return fields[fieldID].getObject(); + } + + ArrayList cachedList; + /** + * Get the values of the fields as an ArrayList. + * @return The values of the fields as an ArrayList. + */ + public ArrayList getFieldsAsList() { + if (!parsed) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i=0; i $HIVE_HOME/bin/hive --auxpath ${contrib_jar},${hbase_jar} + +HOW TO TEST + +Run "ant test -Dtestcase=TestContribHBaseCliDriver -Dqfile=hbase.q -Doverwrite=true -Dtest.silent=false". Index: contrib/build.xml =================================================================== --- contrib/build.xml (revision 883033) +++ contrib/build.xml (working copy) @@ -24,6 +24,7 @@ + @@ -66,6 +67,8 @@ + + + + { - OI oi; + protected OI oi; /** * Create a LazyObject. Index: serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyMap.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyMap.java (revision 883033) +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyMap.java (working copy) @@ -39,7 +39,7 @@ /** * Whether the data is already parsed or not. */ - boolean parsed = false; + protected boolean parsed = false; /** * The size of the map. @@ -281,7 +281,7 @@ * But each LazyMap has a separate cachedMap so we won't overwrite the * data by accident. */ - LinkedHashMap cachedMap; + protected LinkedHashMap cachedMap; /** * Return the map object representing this LazyMap. Index: serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java (revision 883033) +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java (working copy) @@ -43,7 +43,7 @@ /** * Whether the data is already parsed or not. */ - boolean parsed; + protected boolean parsed; /** * The start positions of struct fields. @@ -57,11 +57,11 @@ /** * The fields of the struct. */ - LazyObject[] fields; + protected LazyObject[] fields; /** * Whether init() has been called on the field or not. */ - boolean[] fieldInited; + protected boolean[] fieldInited; /** * Construct a LazyStruct object with the ObjectInspector. Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 883033) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveOtherMetadataHandlerManager.HiveOtherMetadataHandler; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -70,6 +71,7 @@ private static boolean createDefaultDB = false; private String rawStoreClassName; private HiveConf hiveConf; // stores datastore (jpox) properties, right now they come from jpox.properties + private HiveOtherMetadataHandlerManager handlerManager; private Warehouse wh; // hdfs warehouse private ThreadLocal threadLocalMS = new ThreadLocal() { protected synchronized Object initialValue() { @@ -116,6 +118,7 @@ String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", HiveAlterHandler.class.getName()); alterHandler = (AlterHandler) ReflectionUtils.newInstance(getClass(alterHandlerName, AlterHandler.class), hiveConf); wh = new Warehouse(hiveConf); + handlerManager = new HiveOtherMetadataHandlerManager(hiveConf); createDefaultDB(); return true; } @@ -284,6 +287,7 @@ Path tblPath = null; boolean success = false, madeDir = false; + HiveOtherMetadataHandler handler = handlerManager.get(tbl); try { getMS().openTransaction(); if(tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { @@ -310,6 +314,8 @@ madeDir = true; } + if (handler != null) + handler.pre_createTable(tbl, isExternal(tbl)); // set create time long time = System.currentTimeMillis() / 1000; tbl.setCreateTime((int) time); @@ -321,6 +327,8 @@ } finally { if(!success) { getMS().rollbackTransaction(); + if (handler != null) + handler.rollback_createTable(tbl, isExternal(tbl)); if(madeDir) { wh.deleteDir(tblPath, true); } @@ -342,7 +350,7 @@ boolean success = false; boolean isExternal = false; Path tblPath = null; - Table tbl = null; + Table tbl = null, oldtbl = null; isExternal = false; try { getMS().openTransaction(); @@ -359,6 +367,7 @@ if(!getMS().dropTable(dbname, name)) { throw new MetaException("Unable to drop table"); } + oldtbl = tbl; tbl = null; // table collections disappear after dropping success = getMS().commitTransaction(); } finally { @@ -368,6 +377,9 @@ wh.deleteDir(tblPath, true); // ok even if the data is not deleted } + if (success & handlerManager.get(oldtbl) != null) { + handlerManager.get(oldtbl).post_dropTable(oldtbl, deleteData, isExternal); + } } } @@ -419,6 +431,7 @@ Partition part = new Partition(); boolean success = false, madeDir = false; Path partLocation = null; + HiveOtherMetadataHandler handler = null; try { getMS().openTransaction(); part = new Partition(); @@ -448,6 +461,10 @@ } madeDir = true; } + + handler = handlerManager.get(tbl); + if (handler != null) + handler.pre_addPartition(part); // set create time long time = System.currentTimeMillis() / 1000; @@ -461,6 +478,8 @@ } finally { if(!success) { getMS().rollbackTransaction(); + if (handler != null) + handler.rollback_addPartition(part); if(madeDir) { wh.deleteDir(partLocation, true); } @@ -499,6 +518,7 @@ logStartFunction("add_partition", part.getDbName(), part.getTableName()); boolean success = false, madeDir = false; Path partLocation = null; + HiveOtherMetadataHandler handler = null; try { getMS().openTransaction(); Partition old_part = this.get_partition(part.getDbName(), part.getTableName(), part.getValues()); @@ -528,6 +548,10 @@ } madeDir = true; } + + handler = handlerManager.get(tbl); + if (handler != null) + handler.pre_addPartition(part); // set create time long time = System.currentTimeMillis() / 1000; @@ -539,6 +563,8 @@ } finally { if(!success) { getMS().rollbackTransaction(); + if (handler != null) + handler.rollback_addPartition(part); if(madeDir) { wh.deleteDir(partLocation, true); } @@ -555,9 +581,10 @@ boolean success = false; Path partPath = null; Table tbl = null; + Partition part = null; try { getMS().openTransaction(); - Partition part = this.get_partition(db_name, tbl_name, part_vals); + part = this.get_partition(db_name, tbl_name, part_vals); if(part == null) { throw new NoSuchObjectException("Partition doesn't exist. " + part_vals); } @@ -579,6 +606,9 @@ // ok even if the data is not deleted } } + if (success && tbl != null && handlerManager.get(tbl) != null) { + handlerManager.get(tbl).post_dropPartition(tbl, part, deleteData, isExternal(tbl)); + } } return true; } @@ -609,12 +639,29 @@ this.incrementCounter("alter_partition"); logStartFunction("alter_partition", db_name, tbl_name); LOG.info("Partition values:" + new_part.getValues()); + Table tbl = null; + HiveOtherMetadataHandler handler = null; + boolean success = false; try { + tbl = getMS().getTable(db_name, tbl_name); + if(tbl == null) { + throw new InvalidObjectException("Unable to alter partition because table or database do not exist"); + } + + handler = handlerManager.get(tbl); + if (handler != null) + handler.pre_alterPartition(tbl, new_part); + new_part.putToParameters(Constants.DDL_TIME, Long.toString(System.currentTimeMillis() / 1000)); getMS().alterPartition(db_name, tbl_name, new_part); + success = true; } catch(InvalidObjectException e) { LOG.error(StringUtils.stringifyException(e)); throw new InvalidOperationException("alter is not possible"); + } finally { + if (!success && handler != null) { + handler.rollback_alterPartition(tbl, new_part); + } } } @@ -635,8 +682,27 @@ MetaException { this.incrementCounter("alter_table"); logStartFunction("truncate_table: db=" + dbname + " tbl=" + name + " newtbl=" + newTable.getTableName()); - newTable.putToParameters(Constants.DDL_TIME, Long.toString(System.currentTimeMillis() / 1000)); - alterHandler.alterTable(getMS(), wh, dbname, name, newTable); + Table tbl = null; + HiveOtherMetadataHandler handler = null; + boolean success = false; + try { + tbl = getMS().getTable(dbname, name); + if(tbl == null) { + throw new InvalidOperationException("Unable to alter table because table or database do not exist"); + } + + handler = handlerManager.get(tbl); + if (handler != null) + handler.pre_alterTable(tbl, newTable); + + newTable.putToParameters(Constants.DDL_TIME, Long.toString(System.currentTimeMillis() / 1000)); + alterHandler.alterTable(getMS(), wh, dbname, name, newTable); + success = true; + } finally { + if (!success && handler != null) { + handler.rollback_alterTable(tbl, newTable); + } + } } public List get_tables(String dbname, String pattern) throws MetaException { Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveOtherMetadataHandlerManager.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveOtherMetadataHandlerManager.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveOtherMetadataHandlerManager.java (revision 0) @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.HashMap; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +public class HiveOtherMetadataHandlerManager { + + public static interface HiveOtherMetadataHandler extends Configurable { + + public void pre_createTable(Table table, boolean isExternal) throws MetaException; + + public void rollback_createTable(Table table, boolean isExternal) throws MetaException; + + public void post_dropTable(Table table, boolean deleteData, boolean isExternal) throws MetaException; + + public void pre_addPartition(Partition part) throws MetaException; + + public void rollback_addPartition(Partition part) throws MetaException; + + public void post_dropPartition(Table tbl, Partition part, boolean deleteData, boolean isExternal) + throws MetaException; + + public void pre_alterPartition(Table tbl, Partition newPart) throws MetaException; + + public void rollback_alterPartition(Table tbl, Partition newPart) throws MetaException; + + public void pre_alterTable(Table oldTable, Table newTable) throws MetaException; + + public void rollback_alterTable(Table oldTable, Table newTable) throws MetaException; + + } + + HiveConf conf; + private HashMap handlers; + + public HiveOtherMetadataHandlerManager(HiveConf conf) throws MetaException { + this.conf = conf; + handlers = new HashMap(); + + init(); + } + + private void init() throws MetaException { + try { + // The string is presented as InputFormatName1:HandlerName1,InputFormatName2:HandlerName2... + String handlersStr = conf.get("hive.othermetadata.handlers", ""); + String[] handlerStrs = handlersStr.split(","); + if (handlerStrs != null) { + for (String handle : handlerStrs) { + String[] handleKV = handle.split(":"); + // the handle is presented as InputFormatName:HandlerName + if (handleKV.length == 2) { + Class handlerClz = + (Class) Class.forName(handleKV[1]); + HiveOtherMetadataHandler handler = ReflectionUtils.newInstance(handlerClz, (Configuration)conf); + handlers.put(handleKV[0], handler); + } + } + } + } catch (ClassNotFoundException ce) { + throw new MetaException(StringUtils.stringifyException(ce)); + } + } + + public HiveOtherMetadataHandler get(Table tbl) { + if (tbl == null || tbl.getSd() == null) + return null; + return handlers.get(tbl.getSd().getInputFormat()); + } + +} Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 883033) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -26,10 +26,12 @@ import junit.framework.TestCase; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.reduceSinkDesc; import org.apache.hadoop.hive.ql.plan.scriptDesc; import org.apache.hadoop.hive.ql.plan.selectDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; @@ -466,7 +469,30 @@ private void executePlan(File planFile) throws Exception { String testName = new Exception().getStackTrace()[1].getMethodName(); - String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + conf.getJar() + + + String libJarsOption; + { + String addedJars = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.JAR); + conf.setVar(ConfVars.HIVEADDEDJARS, addedJars); + + String auxJars = conf.getAuxJars(); + // Put auxjars and addedjars together into libjars + if (StringUtils.isEmpty(addedJars)) { + if (StringUtils.isEmpty(auxJars)) { + libJarsOption = " "; + } else { + libJarsOption = " -libjars " + auxJars + " "; + } + } else { + if (StringUtils.isEmpty(auxJars)) { + libJarsOption = " -libjars " + addedJars + " "; + } else { + libJarsOption = " -libjars " + addedJars + "," + auxJars + " "; + } + } + } + + String cmdLine = conf.getVar(HiveConf.ConfVars.HADOOPBIN) + " jar " + libJarsOption + conf.getJar() + " org.apache.hadoop.hive.ql.exec.ExecDriver -plan " + planFile.toString() + " " + ExecDriver.generateCmdLine(conf); System.out.println("Executing: " + cmdLine); Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 883033) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; +import java.io.IOException; import java.io.PrintStream; import java.io.Serializable; import java.net.URI; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -81,7 +83,7 @@ private ParseDriver pd; private Hive db; - private HiveConf conf; + protected HiveConf conf; private Driver drv; private SemanticAnalyzer sem; private FileSystem fs; @@ -197,9 +199,16 @@ } srcTables = new LinkedList(); + + preTestUtilInit(); init(); } + protected void preTestUtilInit() throws Exception { + // do some initialization before we setup the hive + // for example, in a HBase Test Util, we setup a hbase mini cluster here + } + public void shutdown() throws Exception { cleanUp(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.fileSinkDesc; @@ -62,6 +63,8 @@ transient protected BytesWritable commonKey = new BytesWritable(); transient protected TableIdEnum tabIdEnum = null; transient private LongWritable row_count; + // should not be transient + private boolean isFileBasedTable = true; public static enum TableIdEnum { TABLE_ID_1_ROWCOUNT, TABLE_ID_2_ROWCOUNT, TABLE_ID_3_ROWCOUNT, TABLE_ID_4_ROWCOUNT, TABLE_ID_5_ROWCOUNT, TABLE_ID_6_ROWCOUNT, TABLE_ID_7_ROWCOUNT, TABLE_ID_8_ROWCOUNT, TABLE_ID_9_ROWCOUNT, TABLE_ID_10_ROWCOUNT, TABLE_ID_11_ROWCOUNT, TABLE_ID_12_ROWCOUNT, TABLE_ID_13_ROWCOUNT, TABLE_ID_14_ROWCOUNT, TABLE_ID_15_ROWCOUNT; @@ -70,10 +73,14 @@ transient protected boolean autoDelete = false; private void commit() throws IOException { - if (!fs.rename(outPath, finalPath)) { - throw new IOException ("Unable to rename output to: " + finalPath); + if (isFileBasedTable) { + if (!fs.rename(outPath, finalPath)) { + throw new IOException ("Unable to rename output to: " + finalPath); + } + LOG.info("Committed to output file: " + finalPath); + } else { + LOG.info("Committed to not-file-based table: " + finalPath.getName()); } - LOG.info("Committed to output file: " + finalPath); } protected void initializeOp(Configuration hconf) throws HiveException { @@ -81,6 +88,7 @@ serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); + isFileBasedTable = HiveFormatUtils.isFileBasedTable(conf.getTableInfo()); JobConf jc; if(hconf instanceof JobConf) { @@ -102,11 +110,15 @@ Path tmpPath = Utilities.toTempPath(specPath); String taskId = Utilities.getTaskId(hconf); fs =(new Path(specPath)).getFileSystem(hconf); - finalPath = new Path(tmpPath, taskId); - outPath = new Path(tmpPath, Utilities.toTempPath(taskId)); + if (isFileBasedTable) { + finalPath = new Path(tmpPath, taskId); + outPath = new Path(tmpPath, Utilities.toTempPath(taskId)); + + LOG.info("Writing to temp file: FS " + outPath); + } else { + finalPath = outPath = new Path(specPath); + } - LOG.info("Writing to temp file: FS " + outPath); - HiveOutputFormat hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); final Class outputClass = serializer.getSerializedClass(); boolean isCompressed = conf.getCompressed(); @@ -135,7 +147,9 @@ outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath); // in recent hadoop versions, use deleteOnExit to clean tmp files. - autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath); + if (isFileBasedTable) { + autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath); + } initializeChildren(hconf); } catch (HiveException e) { @@ -200,7 +214,7 @@ // Hadoop always call close() even if an Exception was thrown in map() or reduce(). try { outWriter.close(abort); - if(!autoDelete) + if(!autoDelete && isFileBasedTable) fs.delete(outPath, true); } catch (Exception e) { e.printStackTrace(); @@ -218,7 +232,7 @@ @Override public void jobClose(Configuration hconf, boolean success) throws HiveException { try { - if(conf != null) { + if(conf != null && isFileBasedTable) { String specPath = conf.getDirName(); fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.io.HiveFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.fetchWork; import org.apache.hadoop.hive.ql.plan.partitionDesc; @@ -77,8 +78,11 @@ iterPartDesc = null; tblDataDone = false; rowWithPart = new Object[2]; + + isFileBasedTable = HiveFormatUtils.isFileBasedTable(work.getTblDesc()); } + private boolean isFileBasedTable; private fetchWork work; private int splitNum; private RecordReader currRecReader; @@ -153,15 +157,22 @@ if (!tblDataDone) { currPath = work.getTblDirPath(); currTbl = work.getTblDesc(); - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - FileStatus[] fStats = fs.listStatus(currPath); - for (FileStatus fStat : fStats) { - if (fStat.getLen() > 0) { - tblDataDone = true; - break; + // if the table is not a file based table + // the table dir path is a mocked path, so + // we just break and set tblDataDone to be true. + if (isFileBasedTable) { + FileSystem fs = currPath.getFileSystem(job); + if (fs.exists(currPath)) { + FileStatus[] fStats = fs.listStatus(currPath); + for (FileStatus fStat : fStats) { + if (fStat.getLen() > 0) { + tblDataDone = true; + break; + } } } + } else { + tblDataDone = true; } if (!tblDataDone) @@ -212,6 +223,7 @@ if (tmp == null) tmp = currPart.getTableDesc(); inputFormat = getInputFormatFromCache(tmp.getInputFileFormatClass(), job); + HiveFormatUtils.initFromTableDesc(inputFormat, tmp, job); inputSplits = inputFormat.getSplits(job, 1); splitNum = 0; serde = tmp.getDeserializerClass().newInstance(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -874,6 +874,12 @@ else outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc().getOutputFileFormatClass(); + if (HiveFormatUtils.isNotFileOutputFormat(outFileFormat)) { + FileInputFormat.addInputPaths(job, path); + LOG.info("Add a not-file-based table " + path); + return numEmptyPaths; + } + // create a dummy empty file in a new directory String newDir = hiveScratchDir + File.separator + (++numEmptyPaths); Path newPath = new Path(newDir); Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveNotFileInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveNotFileInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveNotFileInputFormat.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; + +public interface HiveNotFileInputFormat extends InputFormat { + + /** + * Copy some configuration from tableDesc to JobConf. + * @param desc the descriptor of a given table + * @param job the configuration + */ + public void initFromTableDesc(tableDesc desc, JobConf job); + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -238,7 +238,7 @@ // create a new InputFormat instance if this is the first time to see this class Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - + HiveFormatUtils.initFromTableDesc(inputFormat, part.getTableDesc(), newjob); FileInputFormat.setInputPaths(newjob, dir); newjob.setInputFormat(inputFormat.getClass()); InputSplit[] iss = inputFormat.getSplits(newjob, numSplits/dirs.length); Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveFormatUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFormatUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFormatUtils.java (revision 0) @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; + +/** + * A helper class for various Hive format tasks. It does things like telling + * the file-based hive formats from other hive formats, such as hbase table + * input format. + */ +public class HiveFormatUtils { + + static final Log LOG = LogFactory.getLog(HiveFormatUtils.class); + + static { + notFileInputFormatNames = new HashSet(); + notFileOutputFormatNames = new HashSet(); + + try { + registerNotFileOutputFormat((Class) + Class.forName("org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableOutputFormat")); + registerNotFileInputFormat((Class) + Class.forName("org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableInputFormat")); + } catch (ClassNotFoundException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + + private static Set notFileOutputFormatNames; + + public static void registerNotFileOutputFormat(Class clz) { + notFileOutputFormatNames.add(clz.getName()); + LOG.info("register NotFileOutputFormat : " + clz); + } + + public static boolean isNotFileOutputFormat(Class clz) { + return notFileOutputFormatNames.contains(clz.getName()); + } + + private static Set notFileInputFormatNames; + + public static void registerNotFileInputFormat(Class clz) { + notFileInputFormatNames.add(clz.getName()); + LOG.info("register NotFileInputFormat : " + clz); + } + + public static boolean isNotFileInputFormat(Class clz) { + return notFileInputFormatNames.contains(clz.getName()); + } + + public static boolean isFileBasedTable(tableDesc tableinfo) { + if (tableinfo == null) return true; + return !(notFileInputFormatNames.contains(tableinfo.getInputFileFormatClass().getName()) || + notFileOutputFormatNames.contains(tableinfo.getOutputFileFormatClass().getName())); + } + + public static boolean isFileBasedTable(org.apache.hadoop.hive.metastore.api.Table table) { + if (table == null) return true; + return !(notFileInputFormatNames.contains(table.getSd().getInputFormat()) || + notFileOutputFormatNames.contains(table.getSd().getOutputFormat())); + } + + public static boolean isFileBasedTable(org.apache.hadoop.hive.ql.metadata.Table table) { + if (table == null) return true; + return !(notFileInputFormatNames.contains(table.getInputFormatClass().getName())|| + notFileOutputFormatNames.contains(table.getOutputFormatClass().getName())); + } + + public static void initFromTableDesc(InputFormat inputformat, tableDesc tbl, JobConf job) { + LOG.info("initFromtTableDesc : " + tbl); + if (!HiveFormatUtils.isFileBasedTable(tbl)) { + if (inputformat instanceof HiveNotFileInputFormat) { + ((HiveNotFileInputFormat)inputformat).initFromTableDesc(tbl, job); + } + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -116,6 +116,7 @@ TOK_TBLSEQUENCEFILE; TOK_TBLTEXTFILE; TOK_TBLRCFILE; +TOK_TBLHBASETABLE; TOK_TABLEFILEFORMAT; TOK_TABCOLNAME; TOK_TABLELOCATION; @@ -521,6 +522,7 @@ KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE | KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE | KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE + | KW_STORED KW_AS KW_HBASETABLE -> TOK_TBLHBASETABLE | KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral -> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt) ; @@ -1387,6 +1389,7 @@ KW_SEQUENCEFILE: 'SEQUENCEFILE'; KW_TEXTFILE: 'TEXTFILE'; KW_RCFILE: 'RCFILE'; +KW_HBASETABLE: 'HBASETABLE'; KW_INPUTFORMAT: 'INPUTFORMAT'; KW_OUTPUTFORMAT: 'OUTPUTFORMAT'; KW_LOCATION: 'LOCATION'; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -76,6 +76,12 @@ protected static final String RCFILE_INPUT = RCFileInputFormat.class.getName(); protected static final String RCFILE_OUTPUT = RCFileOutputFormat.class.getName(); protected static final String COLUMNAR_SERDE = ColumnarSerDe.class.getName(); + // Because not every one need the hbase as the hive's base storage, we put the hbase related code + // in the contrib package. + // But as we added the 'HBASETABLE' key word, we need to its related input/output format names here. + // Important! if someone changed these two classes' package, you should change here. + protected static final String HBASETABLE_INPUT = "org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableInputFormat"; + protected static final String HBASETABLE_OUTPUT = "org.apache.hadoop.hive.contrib.hbase.HiveHBaseTableOutputFormat"; public BaseSemanticAnalyzer(HiveConf conf) throws SemanticException { try { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 883033) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -2618,7 +2619,12 @@ throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg()); } dest_path = dest_tab.getPath(); - queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); + boolean isFileBasedTable = HiveFormatUtils.isFileBasedTable(dest_tab); + if(isFileBasedTable) { + queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); + } else { + queryTmpdir = dest_path.toUri().getPath(); + } table_desc = Utilities.getTableDesc(dest_tab); this.idToTableNameMap.put( String.valueOf(this.destTableId), dest_tab.getName()); @@ -2626,11 +2632,13 @@ this.destTableId ++; // Create the work for moving the table - this.loadTableWork.add - (new loadTableDesc(queryTmpdir, - ctx.getExternalTmpFileURI(dest_path.toUri()), - table_desc, - new HashMap())); + if (isFileBasedTable) { + this.loadTableWork.add + (new loadTableDesc(queryTmpdir, + ctx.getExternalTmpFileURI(dest_path.toUri()), + table_desc, + new HashMap())); + } outputs.add(new WriteEntity(dest_tab)); break; } @@ -5259,6 +5267,10 @@ outputFormat = RCFILE_OUTPUT; serde = COLUMNAR_SERDE; break; + case HiveParser.TOK_TBLHBASETABLE: + inputFormat = HBASETABLE_INPUT; + outputFormat = HBASETABLE_OUTPUT; + break; case HiveParser.TOK_TABLEFILEFORMAT: inputFormat = unescapeSQLString(child.getChild(0).getText()); outputFormat = unescapeSQLString(child.getChild(1).getText()); @@ -5269,6 +5281,11 @@ default: assert false; } } + + if (inputFormat == HBASETABLE_INPUT && serde == null) { + throw new SemanticException("Should specify the serde during creating a hbased-hive table!" + + "Because you need to specify the column mapping from hbase table to hive table."); + } // check for existence of table if ( ifNotExists ) {