From e4f27b95bfc167d9c86f53391de4d97ae1471231 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 6 Jun 2014 18:37:10 -0700 Subject: [PATCH 1/2] HIVE-6473 Allow writing HFiles via HBaseStorageHandler table --- .../hadoop/hive/hbase/HBaseStorageHandler.java | 27 ++++++++- .../hadoop/hive/hbase/HiveHFileOutputFormat.java | 68 +++++++++++++++++----- .../negative/generatehfiles_require_family_path.q | 10 ++++ .../src/test/queries/positive/hbase_handler_bulk.q | 23 ++++++++ .../generatehfiles_require_family_path.q.out | 20 +++++++ .../test/results/positive/hbase_handler_bulk.q.out | 52 +++++++++++++++++ 6 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q create mode 100644 hbase-handler/src/test/queries/positive/hbase_handler_bulk.q create mode 100644 hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out create mode 100644 hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 19c2cf2..7b91e1d 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -262,7 +262,10 @@ public void setConf(Configuration conf) { @Override public Class getOutputFormatClass() { - return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class; + if (isHBaseGenerateHFiles(jobConf)) { + return HiveHFileOutputFormat.class; + } + return HiveHBaseTableOutputFormat.class; } @Override @@ -349,11 +352,31 @@ public void configureTableJobProperties( } //input job properties } else { - jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName); + if (isHBaseGenerateHFiles(jobConf)) { + // only support bulkload when a hfile.family.path has been specified. + // TODO: support detecting cf's from column mapping + // TODO: support loading into multiple CF's at a time + String path = HiveHFileOutputFormat.getFamilyPath(jobConf, tableProperties); + if (path == null || path.isEmpty()) { + throw new RuntimeException("Please set " + HiveHFileOutputFormat.HFILE_FAMILY_PATH + " to target location for HFiles"); + } + // TODO: should call HiveHFileOutputFormat#setOutputPath + jobProperties.put("mapred.output.dir", path); + } else { + jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName); + } } // output job properties } /** + * Return true when HBaseStorageHandler should generate hfiles instead of operate against the + * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true. + */ + public static boolean isHBaseGenerateHFiles(Configuration conf) { + return conf.getBoolean("hive.hbase.generatehfiles", false); + } + + /** * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map * if they are not already present in the jobConf. * @param jobConf Job configuration diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index be1210e..08572a0 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.SortedMap; @@ -27,10 +30,14 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; @@ -54,10 +61,9 @@ HFileOutputFormat implements HiveOutputFormat { - private static final String HFILE_FAMILY_PATH = "hfile.family.path"; + public static final String HFILE_FAMILY_PATH = "hfile.family.path"; - static final Log LOG = LogFactory.getLog( - HiveHFileOutputFormat.class.getName()); + static final Log LOG = LogFactory.getLog(HiveHFileOutputFormat.class.getName()); private org.apache.hadoop.mapreduce.RecordWriter @@ -70,6 +76,14 @@ } } + /** + * Retrieve the family path, first check the JobConf, then the table properties. + * @return the family path or null if not specified. + */ + public static String getFamilyPath(Configuration jc, Properties tableProps) { + return jc.get(HFILE_FAMILY_PATH, tableProps.getProperty(HFILE_FAMILY_PATH)); + } + @Override public RecordWriter getHiveRecordWriter( final JobConf jc, @@ -79,8 +93,8 @@ public RecordWriter getHiveRecordWriter( Properties tableProperties, final Progressable progressable) throws IOException { - // Read configuration for the target path - String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH); + // Read configuration for the target path, first from jobconf, then from table properties + String hfilePath = getFamilyPath(jc, tableProperties); if (hfilePath == null) { throw new RuntimeException( "Please set " + HFILE_FAMILY_PATH + " to target location for HFiles"); @@ -129,20 +143,18 @@ public void close(boolean abort) throws IOException { if (abort) { return; } - // Move the region file(s) from the task output directory - // to the location specified by the user. There should - // actually only be one (each reducer produces one HFile), - // but we don't know what its name is. + // Move the hfiles file(s) from the task output directory to the + // location specified by the user. FileSystem fs = outputdir.getFileSystem(jc); fs.mkdirs(columnFamilyPath); Path srcDir = outputdir; for (;;) { FileStatus [] files = fs.listStatus(srcDir); if ((files == null) || (files.length == 0)) { - throw new IOException("No files found in " + srcDir); + throw new IOException("No family directories found in " + srcDir); } if (files.length != 1) { - throw new IOException("Multiple files found in " + srcDir); + throw new IOException("Multiple family directories found in " + srcDir); } srcDir = files[0].getPath(); if (srcDir.getName().equals(columnFamilyName)) { @@ -165,10 +177,9 @@ public void close(boolean abort) throws IOException { } } - @Override - public void write(Writable w) throws IOException { + private void writeText(Text text) throws IOException { // Decompose the incoming text row into fields. - String s = ((Text) w).toString(); + String s = text.toString(); String [] fields = s.split("\u0001"); assert(fields.length <= (columnMap.size() + 1)); // First field is the row key. @@ -196,11 +207,40 @@ public void write(Writable w) throws IOException { valBytes); try { fileWriter.write(null, kv); + } catch (IOException e) { + LOG.error("Failed while writing row: " + s); + throw e; } catch (InterruptedException ex) { throw new IOException(ex); } } } + + private void writePut(PutWritable put) throws IOException { + ImmutableBytesWritable row = new ImmutableBytesWritable(put.getPut().getRow()); + SortedMap> cells = put.getPut().getFamilyCellMap(); + for (Map.Entry> entry : cells.entrySet()) { + Collections.sort(entry.getValue(), new CellComparator()); + for (Cell c : entry.getValue()) { + try { + fileWriter.write(row, KeyValueUtil.copyToNewKeyValue(c)); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + } + } + } + + @Override + public void write(Writable w) throws IOException { + if (w instanceof Text) { + writeText((Text) w); + } else if (w instanceof PutWritable) { + writePut((PutWritable) w); + } else { + throw new IOException("Unexpected writable " + w); + } + } }; } diff --git a/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q b/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q new file mode 100644 index 0000000..6844fbc --- /dev/null +++ b/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q @@ -0,0 +1,10 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.generatehfiles = true; +INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q new file mode 100644 index 0000000..f03da63 --- /dev/null +++ b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q @@ -0,0 +1,23 @@ +-- -*- mode:sql -*- + +drop table if exists hb_target; + +-- this is the target HBase table +create table hb_target(key int, val string) +stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') +tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk'); + +set hive.hbase.generatehfiles=true; +set hfile.family.path=/tmp/hb_target/cf; + +-- this should produce three files in /tmp/hb_target/cf +insert overwrite table hb_target select distinct key, value from src cluster by key; + +-- To get the files out to your local filesystem for loading into +-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and +-- semicolon-terminate the line below before running this test: +-- dfs -copyToLocal /tmp/hb_target/cf/* /tmp/blah/cf + +drop table hb_target; +dfs -rmr /tmp/hb_target/cf; diff --git a/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out b/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out new file mode 100644 index 0000000..f0abcd5 --- /dev/null +++ b/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out @@ -0,0 +1,20 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_bulk +FAILED: RuntimeException Please set hfile.family.path to target location for HFiles diff --git a/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out b/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out new file mode 100644 index 0000000..9c24845 --- /dev/null +++ b/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out @@ -0,0 +1,52 @@ +PREHOOK: query: -- -*- mode:sql -*- + +drop table if exists hb_target +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +drop table if exists hb_target +POSTHOOK: type: DROPTABLE +PREHOOK: query: -- this is the target HBase table +create table hb_target(key int, val string) +stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') +tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- this is the target HBase table +create table hb_target(key int, val string) +stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') +tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hb_target +#### A masked pattern was here #### +insert overwrite table hb_target select distinct key, value from src cluster by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hb_target +#### A masked pattern was here #### +insert overwrite table hb_target select distinct key, value from src cluster by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hb_target +PREHOOK: query: -- To get the files out to your local filesystem for loading into +#### A masked pattern was here #### +-- semicolon-terminate the line below before running this test: +#### A masked pattern was here #### + +drop table hb_target +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hb_target +PREHOOK: Output: default@hb_target +POSTHOOK: query: -- To get the files out to your local filesystem for loading into +#### A masked pattern was here #### +-- semicolon-terminate the line below before running this test: +#### A masked pattern was here #### + +drop table hb_target +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hb_target +POSTHOOK: Output: default@hb_target +#### A masked pattern was here #### -- 1.9.0