diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 8cd594b..f9f2924 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -28,6 +28,8 @@ import java.util.Properties; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -66,6 +68,7 @@ public class HBaseStorageHandler extends DefaultStorageHandler implements HiveMetaHook, HiveStoragePredicateHandler { + final static private Log LOG = LogFactory.getLog(HBaseStorageHandler.class); final static public String DEFAULT_PREFIX = "default."; //Check if the configure job properties is called from input @@ -255,7 +258,11 @@ public void setConf(Configuration conf) { @Override public Class getOutputFormatClass() { - return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class; + if (isHBaseGenerateHFiles(jobConf)) { + return HiveHFileOutputFormat.class; + } else { + return HiveHBaseTableOutputFormat.class; + } } @Override @@ -342,11 +349,40 @@ public void configureTableJobProperties( } //input job properties } else { - jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName); + if (isHBaseGenerateHFiles(jobConf)) { + // only support bulkload when a hfile.family.path has been specified. + // TODO: support generating a temporary output path when hfile.family.path is not specified. + // TODO: support loading into multiple CF's at a time + String path = HiveHFileOutputFormat.getFamilyPath(jobConf, tableProperties); + if (path == null || path.isEmpty()) { + throw new RuntimeException("Please set " + HiveHFileOutputFormat.HFILE_FAMILY_PATH + " to target location for HFiles"); + } + // TODO: use a variation of FileOutputFormat.setOutputPath + LOG.debug("Setting mapred.output.dir to " + path); + jobProperties.put("mapred.output.dir", path); + } else { + jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName); + } } // output job properties } /** + * Return true when the bulkload operation should be completed -- that is, we need the extra + * move task that calls completeBulkLoad. + */ + public static boolean isHBaseCompleteBulkLoad(Configuration conf) { + return conf.getBoolean("hive.hbase.completebulkload", false); + } + + /** + * Return true when HBaseStorageHandler should generate hfiles instead of operate against the + * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true. + */ + public static boolean isHBaseGenerateHFiles(Configuration conf) { + return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false); + } + + /** * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map * if they are not already present in the jobConf. * @param jobConf Job configuration @@ -378,7 +414,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try { TableMapReduceUtil.addDependencyJars(jobConf); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf, - HBaseStorageHandler.class, org.apache.hadoop.hbase.HBaseConfiguration.class); + HBaseStorageHandler.class, org.apache.hadoop.hbase.HBaseConfiguration.class); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index 6d383b5..16d9a9e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.SortedMap; @@ -27,10 +30,14 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; @@ -54,10 +61,9 @@ HFileOutputFormat implements HiveOutputFormat { - private static final String HFILE_FAMILY_PATH = "hfile.family.path"; + public static final String HFILE_FAMILY_PATH = "hfile.family.path"; - static final Log LOG = LogFactory.getLog( - HiveHFileOutputFormat.class.getName()); + static final Log LOG = LogFactory.getLog(HiveHFileOutputFormat.class.getName()); private org.apache.hadoop.mapreduce.RecordWriter @@ -70,6 +76,14 @@ } } + /** + * Retrieve the family path, first check the JobConf, then the table properties. + * @return the family path or null if not specified. + */ + public static String getFamilyPath(Configuration jc, Properties tableProps) { + return jc.get(HFILE_FAMILY_PATH, tableProps.getProperty(HFILE_FAMILY_PATH)); + } + @Override public FSRecordWriter getHiveRecordWriter( final JobConf jc, @@ -79,8 +93,8 @@ public FSRecordWriter getHiveRecordWriter( Properties tableProperties, final Progressable progressable) throws IOException { - // Read configuration for the target path - String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH); + // Read configuration for the target path, first from jobconf, then from table properties + String hfilePath = getFamilyPath(jc, tableProperties); if (hfilePath == null) { throw new RuntimeException( "Please set " + HFILE_FAMILY_PATH + " to target location for HFiles"); @@ -129,14 +143,20 @@ public void close(boolean abort) throws IOException { if (abort) { return; } - // Move the region file(s) from the task output directory - // to the location specified by the user. There should - // actually only be one (each reducer produces one HFile), - // but we don't know what its name is. + /* + * Move the region file(s) from the task output directory to the location specified by + * the user. There should actually only be one (each reducer produces one HFile), but + * we don't know what its name is. + * + * TODO: simplify bulkload to detecting the HBaseStorageHandler scenario, ignore + * hfile.family.path, skip this move step and allow MoveTask to operate directly off + * of SemanticAnalyzer's queryTempdir. + */ FileSystem fs = outputdir.getFileSystem(jc); fs.mkdirs(columnFamilyPath); Path srcDir = outputdir; for (;;) { + LOG.debug("Looking for column family names in " + srcDir); FileStatus [] files = fs.listStatus(srcDir); if ((files == null) || (files.length == 0)) { throw new IOException("No files found in " + srcDir); @@ -150,6 +170,7 @@ public void close(boolean abort) throws IOException { } } for (FileStatus regionFile : fs.listStatus(srcDir)) { + LOG.debug("Moving hfile " + regionFile.getPath() + " to new parent directory " + columnFamilyPath); fs.rename( regionFile.getPath(), new Path( @@ -165,10 +186,9 @@ public void close(boolean abort) throws IOException { } } - @Override - public void write(Writable w) throws IOException { + private void writeTest(Text text) throws IOException { // Decompose the incoming text row into fields. - String s = ((Text) w).toString(); + String s = text.toString(); String [] fields = s.split("\u0001"); assert(fields.length <= (columnMap.size() + 1)); // First field is the row key. @@ -196,11 +216,40 @@ public void write(Writable w) throws IOException { valBytes); try { fileWriter.write(null, kv); + } catch (IOException e) { + LOG.info("Failed while writing row: " + s); + throw e; } catch (InterruptedException ex) { throw new IOException(ex); } } } + + private void writePut(PutWritable put) throws IOException { + ImmutableBytesWritable row = new ImmutableBytesWritable(put.getPut().getRow()); + SortedMap> cells = put.getPut().getFamilyCellMap(); + for (Map.Entry> entry : cells.entrySet()) { + Collections.sort(entry.getValue(), new CellComparator()); + for (Cell c : entry.getValue()) { + try { + fileWriter.write(row, KeyValueUtil.copyToNewKeyValue(c)); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + } + } + } + + @Override + public void write(Writable w) throws IOException { + if (w instanceof Text) { + writeTest((Text) w); + } else if (w instanceof PutWritable) { + writePut((PutWritable) w); + } else { + throw new IOException("Unexpected writable " + w); + } + } }; } diff --git a/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q new file mode 100644 index 0000000..ef15416 --- /dev/null +++ b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q @@ -0,0 +1,10 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.completebulkload = true; +INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q new file mode 100644 index 0000000..6844fbc --- /dev/null +++ b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q @@ -0,0 +1,10 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.generatehfiles = true; +INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q new file mode 100644 index 0000000..1540c6f --- /dev/null +++ b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q @@ -0,0 +1,15 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.generatehfiles = true; +SET hfile.family.path = /tmp/bulk_hfiles/f; +EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; + +SET hive.hbase.completebulkload = true; +SET hfile.family.path = /tmp/bulk_hfiles/f; +EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out new file mode 100644 index 0000000..f0e19ab --- /dev/null +++ b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out @@ -0,0 +1,18 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_bulk +FAILED: RuntimeException Please set hfile.family.path to target location for HFiles diff --git a/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out new file mode 100644 index 0000000..f0e19ab --- /dev/null +++ b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out @@ -0,0 +1,18 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_bulk +FAILED: RuntimeException Please set hfile.family.path to target location for HFiles diff --git a/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out new file mode 100644 index 0000000..860582a --- /dev/null +++ b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out @@ -0,0 +1,79 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_bulk +PREHOOK: query: INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_bulk +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_bulk +PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reduce Operator Tree: + Extract + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat + serde: org.apache.hadoop.hive.hbase.HBaseSerDe + name: default.hbase_bulk + + Stage: Stage-0 + Move Operator + HBase completeBulkLoad: + table: + input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat + serde: org.apache.hadoop.hive.hbase.HBaseSerDe + name: default.hbase_bulk + + Stage: Stage-2 + Stats-Aggr Operator + diff --git a/ql/pom.xml b/ql/pom.xml index 53d0b9e..8003e47 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -331,7 +331,17 @@ org.apache.hadoop hadoop-core ${hadoop-20S.version} - true + true + + + org.apache.hbase + hbase-client + ${hbase.hadoop1.version} + + + org.apache.hbase + hbase-server + ${hbase.hadoop1.version} @@ -373,6 +383,16 @@ ${hadoop-23.version} true + + org.apache.hbase + hbase-client + ${hbase.hadoop2.version} + + + org.apache.hbase + hbase-server + ${hbase.hadoop2.version} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ed7787d..fe5512f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -27,13 +27,20 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -57,6 +64,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -195,6 +203,20 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { } } + private void completeBulkLoad(Path sourcePath, String targetTable, Configuration conf) throws Exception { + LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); + HConnection conn = null; + HTable table = null; + try { + conn = HConnectionManager.createConnection(conf); + table = (HTable) conn.getTable(targetTable); + loadIncrementalHFiles.doBulkLoad(sourcePath, table); + } finally { + if (table != null) table.close(); + if (conn != null) conn.close(); + } + } + @Override public int execute(DriverContext driverContext) { @@ -428,6 +450,41 @@ public int execute(DriverContext driverContext) { releaseLocks(tbd); } + // for HFiles + HBaseCompleteBulkLoadDesc cbld = work.getCompleteBulkLoadWork(); + if (cbld != null) { + // lookup hfile.family.path. Duplicated from HiveHFileOutputFormat#getFamilyPath + Configuration conf = driverContext.getCtx().getConf(); + Properties tableProps = cbld.getTable().getProperties(); + Path columnFamilyPath = new Path(conf.get("hfile.family.path", tableProps.getProperty("hfile.family.path"))); + Path sourcePath = columnFamilyPath.getParent(); + // TODO: assert hfile.family.path is a directory of HFiles + assert sourcePath.getFileSystem(driverContext.getCtx().getConf()).isDirectory(sourcePath) : sourcePath + " is not a directory."; + + String tableName = tableProps.getProperty("hbase.table.name" /* HBaseSerDe#HBASE_TABLE_NAME */); + conf = HBaseConfiguration.create(conf); + console.printInfo("Registering HFiles with RegionServers: " + sourcePath + " => " + tableName); + completeBulkLoad(sourcePath, tableName, conf); + + // after bulkload, all hfiles should be gone + FileSystem fs = columnFamilyPath.getFileSystem(conf); + FileStatus[] files = fs.listStatus(columnFamilyPath); + if (files == null || files.length == 0) { + // bulkload succeeded. Clean up empty column family directory. + fs.delete(columnFamilyPath, true); + } else { + // bulkload failed. report abandoned files. + long totalSize = 0; + for (FileStatus f : files) { + totalSize += f.getLen(); + } + String msg = "Failed to bulkload all HFiles in " + columnFamilyPath + ". Roughly " + + StringUtils.humanReadableInt(totalSize) + "bytes abandoned."; + console.printError("HFiles remain; registration failed!", msg); + return 1; + } + } + return 0; } catch (Exception e) { console.printError("Failed with exception " + e.getMessage(), "\n" diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 0169077..a79b8de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -95,6 +95,9 @@ public int execute(DriverContext driverContext) { if (work.getLoadFileDesc() != null) { workComponentsPresent++; } + if (work.getCompleteBulkLoadDesc() != null) { + workComponentsPresent++; + } assert (workComponentsPresent == 1); @@ -102,8 +105,10 @@ public int execute(DriverContext driverContext) { try { if (work.getLoadTableDesc() != null) { tableName = work.getLoadTableDesc().getTable().getTableName(); - } else if (work.getTableSpecs() != null){ + } else if (work.getTableSpecs() != null) { tableName = work.getTableSpecs().tableName; + } else if (work.getCompleteBulkLoadDesc() != null) { + tableName = work.getCompleteBulkLoadDesc().getTable().getTableName(); } else { tableName = work.getLoadFileDesc().getDestinationCreateTable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index d2aa220..233c21f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1229,7 +1229,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // 2. Constructing a conditional task consisting of a move task and a map reduce task // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), null, false); MapWork cplan; Serializable work; @@ -1379,6 +1379,8 @@ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, statsWork = new StatsWork(mvWork.getLoadTableWork()); } else if (mvWork.getLoadFileWork() != null) { statsWork = new StatsWork(mvWork.getLoadFileWork()); + } else if (mvWork.getCompleteBulkLoadWork() != null) { + statsWork = new StatsWork(mvWork.getCompleteBulkLoadWork()); } assert statsWork != null : "Error when genereting StatsTask"; @@ -1589,6 +1591,8 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { srcDir = mvWork.getLoadFileWork().getSourcePath(); } else if (mvWork.getLoadTableWork() != null) { srcDir = mvWork.getLoadTableWork().getSourcePath(); + } else if (mvWork.getCompleteBulkLoadWork() != null) { + srcDir = mvWork.getCompleteBulkLoadWork().getSourcePath(); } if ((srcDir != null) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 1f539ef..4e7a064 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -887,7 +887,7 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, null, false), conf); truncateTask.addDependentTask(moveTsk); @@ -1502,7 +1502,7 @@ private void analyzeAlterTablePartMergeFiles(ASTNode tablePartAST, ASTNode ast, LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, null, false), conf); mergeTask.addDependentTask(moveTsk); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ceb4c8a..044abcf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -290,7 +290,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Utilities.getTableDesc(table), new TreeMap(), false); Task loadTableTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, false), conf); + getOutputs(), loadTableWork, null, null, false), conf); copyTask.addDependentTask(loadTableTask); rootTasks.add(copyTask); return loadTableTask; @@ -338,7 +338,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { partSpec.getPartSpec(), true); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - getInputs(), getOutputs(), loadTableWork, null, false), + getInputs(), getOutputs(), loadTableWork, null, null, false), conf); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index a22a15f..29e33f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -261,7 +261,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite); Task childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true), conf); + getOutputs(), loadTableWork, null, null, true), conf); if (rTask != null) { rTask.addDependentTask(childTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 76f5a31..e009f5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -124,8 +124,8 @@ public ParseContext getParseContext(ParseContext pCtx, List nameToSplitSample; private List loadTableWork; private List loadFileWork; + private List completeBulkLoadWork; private Context ctx; private HiveConf conf; private HashMap idToTableNameMap; @@ -170,7 +172,7 @@ public ParseContext( HashMap topToTable, HashMap> topToProps, Map fsopToTable, - List loadTableWork, List loadFileWork, + List loadTableWork, List loadFileWork, List completeBulkLoadWork, Context ctx, HashMap idToTableNameMap, int destTableId, UnionProcContext uCtx, List> listMapJoinOpsNoReducer, Map> groupOpToInputTables, @@ -195,6 +197,7 @@ public ParseContext( this.topToProps = topToProps; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; + this.completeBulkLoadWork = completeBulkLoadWork; this.opParseCtx = opParseCtx; this.topOps = topOps; this.topSelOps = topSelOps; @@ -451,6 +454,14 @@ public void setLoadFileWork(List loadFileWork) { this.loadFileWork = loadFileWork; } + public List getCompleteBulkLoadWork() { + return completeBulkLoadWork; + } + + public void setCompleteBulkLoadWork(List completeBulkLoadWork) { + this.completeBulkLoadWork = completeBulkLoadWork; + } + public HashMap getIdToTableNameMap() { return idToTableNameMap; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 77388dd..e53b6da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -135,6 +135,7 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; @@ -204,6 +205,7 @@ private LinkedHashMap, OpParseContext> opParseCtx; private List loadTableWork; private List loadFileWork; + private List completeBulkLoadWork; private Map joinContext; private Map smbMapJoinContext; private final HashMap topToTable; @@ -276,6 +278,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { topSelOps = new HashMap>(); loadTableWork = new ArrayList(); loadFileWork = new ArrayList(); + completeBulkLoadWork = new ArrayList(); opParseCtx = new LinkedHashMap, OpParseContext>(); joinContext = new HashMap(); smbMapJoinContext = new HashMap(); @@ -303,6 +306,7 @@ protected void reset() { super.reset(); loadTableWork.clear(); loadFileWork.clear(); + completeBulkLoadWork.clear(); topOps.clear(); topSelOps.clear(); destTableId = 1; @@ -327,6 +331,7 @@ public void initParseCtx(ParseContext pctx) { opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); + completeBulkLoadWork = pctx.getCompleteBulkLoadWork(); joinContext = pctx.getJoinContext(); smbMapJoinContext = pctx.getSmbMapJoinContext(); ctx = pctx.getContext(); @@ -344,8 +349,8 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, - fsopToTable, loadTableWork, - loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + fsopToTable, loadTableWork, loadFileWork, completeBulkLoadWork, + ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, @@ -5399,6 +5404,35 @@ private boolean checkHoldDDLTime(QB qb) { return false; } + /** + * Return true when {@code table} is registered with the HBaseStorageHandler, false otherwise. + */ + private boolean isHBaseTable(Table table) { + return table != null + && table.getStorageHandler() != null + && table.getStorageHandler().getClass().getSimpleName().equals("HBaseStorageHandler"); + } + + /** + * Return true when the bulkload operation should be completed -- that is, we need the extra + * move task that calls completeBulkLoad. + * + * Logic duplicated from {@code HBaseStorageHandler#isHBaseCompleteBulkLoad(Configuration)} + */ + private boolean isHBaseCompleteBulkLoad(HiveConf conf) { + return conf.getBoolean("hive.hbase.completebulkload", false); + } + + /** + * Return true when HBaseStorageHandler should generate hfiles instead of operate against the + * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true. + * + * Logic duplicated from {@code HBaseStorageHandler#isHBaseGenerateHFiles(Configuration)} + */ + private boolean isHBaseGenerateHFiles(HiveConf conf) { + return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false); + } + @SuppressWarnings("nls") private Operator genFileSinkPlan(String dest, QB qb, Operator input) throws SemanticException { @@ -5417,6 +5451,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) SortBucketRSCtx rsCtx = new SortBucketRSCtx(); DynamicPartitionCtx dpCtx = null; LoadTableDesc ltd = null; + HBaseCompleteBulkLoadDesc cbld = null; boolean holdDDLTime = checkHoldDDLTime(qb); ListBucketingCtx lbCtx = null; @@ -5474,15 +5509,15 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) } boolean isNonNativeTable = dest_tab.isNonNative(); - if (isNonNativeTable) { - queryTmpdir = dest_path; - } else { - // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. + if (!isNonNativeTable || isHBaseTable(dest_tab) && isHBaseGenerateHFiles(conf)) { + // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir - // on same namespace as tbl dir. - queryTmpdir = dest_path.toUri().getScheme().equals("viewfs") ? - ctx.getExtTmpPathRelTo(dest_path.getParent().toUri()) : + // on same namespace as tbl dir. + queryTmpdir = dest_path.toUri().getScheme().equals("viewfs") ? + ctx.getExtTmpPathRelTo(dest_path.getParent().toUri()) : ctx.getExternalTmpPath(dest_path.toUri()); + } else { + queryTmpdir = dest_path; } if (dpCtx != null) { // set the root of the temporay path where dynamic partition columns will populate @@ -5517,6 +5552,16 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) loadTableWork.add(ltd); } + // only complete the bulkload when explicitly requested. + if (isHBaseTable(dest_tab) && isHBaseCompleteBulkLoad(conf)) { + /* + * In order for the MoveTask to be added to the plan, cbld.getSourcePath() must match + * finalDirName in GenMapRedUtils#findMoveTask + */ + cbld = new HBaseCompleteBulkLoadDesc(queryTmpdir, table_desc); + completeBulkLoadWork.add(cbld); + } + WriteEntity output = null; // Here only register the whole table for post-exec hook if no DP present @@ -9105,7 +9150,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, fsopToTable, - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, completeBulkLoadWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index b569ed0..6ba8f1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -87,6 +88,7 @@ public void compile(final ParseContext pCtx, final List loadTableWork = pCtx.getLoadTableWork(); List loadFileWork = pCtx.getLoadFileWork(); + List completeBulkLoadWork = pCtx.getCompleteBulkLoadWork(); boolean isCStats = qb.isAnalyzeRewrite(); @@ -135,7 +137,7 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { @@ -188,10 +190,14 @@ public void compile(final ParseContext pCtx, final List inputs, HashSet outputs) { public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { + final HBaseCompleteBulkLoadDesc completeBulkLoadWork, boolean checkFileFormat) { this(inputs, outputs); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; + this.completeBulkLoadWork = completeBulkLoadWork; this.checkFileFormat = checkFileFormat; } @@ -97,6 +99,15 @@ public void setLoadFileWork(final LoadFileDesc loadFileWork) { this.loadFileWork = loadFileWork; } + @Explain(displayName = "HBase completeBulkLoad") + public HBaseCompleteBulkLoadDesc getCompleteBulkLoadWork() { + return completeBulkLoadWork; + } + + public void setCompleteBulkLoadWork(HBaseCompleteBulkLoadDesc completeBulkLoadWork) { + this.completeBulkLoadWork = completeBulkLoadWork; + } + public boolean getCheckFileFormat() { return checkFileFormat; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index 66d4d4a..d572d74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -34,6 +34,7 @@ private tableSpec tableSpecs; // source table spec -- for TableScanOperator private LoadTableDesc loadTableDesc; // same as MoveWork.loadTableDesc -- for FileSinkOperator private LoadFileDesc loadFileDesc; // same as MoveWork.loadFileDesc -- for FileSinkOperator + private HBaseCompleteBulkLoadDesc completeBulkLoadDesc; private String aggKey; // aggregation key prefix private boolean statsReliable; // are stats completely reliable @@ -70,6 +71,10 @@ public StatsWork(LoadFileDesc loadFileDesc) { this.loadFileDesc = loadFileDesc; } + public StatsWork(HBaseCompleteBulkLoadDesc completeBulkLoadDesc) { + this.completeBulkLoadDesc = completeBulkLoadDesc; + } + public StatsWork(boolean statsReliable) { this.statsReliable = statsReliable; } @@ -86,6 +91,10 @@ public LoadFileDesc getLoadFileDesc() { return loadFileDesc; } + public HBaseCompleteBulkLoadDesc getCompleteBulkLoadDesc() { + return completeBulkLoadDesc; + } + public void setAggKey(String aggK) { aggKey = aggK; }