From 27cd6a0600d238ceda0b32019149312aaae6e31b Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 6 Jun 2014 18:35:29 -0700 Subject: [PATCH 2/2] HIVE-2365 SQL support for bulk load into HBase --- .../hadoop/hive/hbase/HBaseStorageHandler.java | 10 +- .../hadoop/hive/hbase/HiveHFileOutputFormat.java | 13 ++- .../bulk_completebulkload_require_family_path.q | 10 ++ .../bulk_generatehfiles_require_family_path.q | 10 ++ .../queries/positive/hbase_storage_handler_bulk.q | 15 +++ ...bulk_completebulkload_require_family_path.q.out | 20 ++++ .../bulk_generatehfiles_require_family_path.q.out | 20 ++++ .../positive/hbase_storage_handler_bulk.q.out | 109 +++++++++++++++++++++ ql/pom.xml | 22 ++++- .../org/apache/hadoop/hive/ql/exec/MoveTask.java | 55 +++++++++++ .../org/apache/hadoop/hive/ql/exec/StatsTask.java | 7 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 7 +- .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 8 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +- .../hadoop/hive/ql/parse/LoadSemanticAnalyzer.java | 2 +- .../apache/hadoop/hive/ql/parse/ParseContext.java | 13 ++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 59 +++++++++-- .../apache/hadoop/hive/ql/parse/TaskCompiler.java | 12 ++- .../hive/ql/plan/HBaseCompleteBulkLoadDesc.java | 17 ++++ .../org/apache/hadoop/hive/ql/plan/MoveWork.java | 23 +++-- .../org/apache/hadoop/hive/ql/plan/StatsWork.java | 9 ++ 21 files changed, 411 insertions(+), 34 deletions(-) create mode 100644 hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q create mode 100644 hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q create mode 100644 hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q create mode 100644 hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out create mode 100644 hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out create mode 100644 hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/plan/HBaseCompleteBulkLoadDesc.java diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 7b91e1d..c16c5b2 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -369,11 +369,19 @@ public void configureTableJobProperties( } /** + * Return true when the bulkload operation should be completed -- that is, we need the extra + * move task that calls completeBulkLoad. + */ + public static boolean isHBaseCompleteBulkLoad(Configuration conf) { + return conf.getBoolean("hive.hbase.completebulkload", false); + } + + /** * Return true when HBaseStorageHandler should generate hfiles instead of operate against the * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true. */ public static boolean isHBaseGenerateHFiles(Configuration conf) { - return conf.getBoolean("hive.hbase.generatehfiles", false); + return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false); } /** diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index 08572a0..0e0e0c5 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -143,12 +143,19 @@ public void close(boolean abort) throws IOException { if (abort) { return; } - // Move the hfiles file(s) from the task output directory to the - // location specified by the user. + /* + * Move the hfiles file(s) from the task output directory to the + * location specified by the user. + * + * TODO: simplify bulkload to detecting the HBaseStorageHandler scenario, ignore + * hfile.family.path, skip this move step and allow MoveTask to operate directly off + * of SemanticAnalyzer's queryTempdir. + */ FileSystem fs = outputdir.getFileSystem(jc); fs.mkdirs(columnFamilyPath); Path srcDir = outputdir; for (;;) { + LOG.debug("Looking for column family names in " + srcDir); FileStatus [] files = fs.listStatus(srcDir); if ((files == null) || (files.length == 0)) { throw new IOException("No family directories found in " + srcDir); @@ -162,6 +169,8 @@ public void close(boolean abort) throws IOException { } } for (FileStatus regionFile : fs.listStatus(srcDir)) { + LOG.debug("Moving hfile " + regionFile.getPath() + " to new parent directory " + + columnFamilyPath); fs.rename( regionFile.getPath(), new Path( diff --git a/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q new file mode 100644 index 0000000..ef15416 --- /dev/null +++ b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q @@ -0,0 +1,10 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.completebulkload = true; +INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q new file mode 100644 index 0000000..6844fbc --- /dev/null +++ b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q @@ -0,0 +1,10 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.generatehfiles = true; +INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q new file mode 100644 index 0000000..1540c6f --- /dev/null +++ b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q @@ -0,0 +1,15 @@ +-- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk; + +CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string'); + +SET hive.hbase.generatehfiles = true; +SET hfile.family.path = /tmp/bulk_hfiles/f; +EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; + +SET hive.hbase.completebulkload = true; +SET hfile.family.path = /tmp/bulk_hfiles/f; +EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key; diff --git a/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out new file mode 100644 index 0000000..f0abcd5 --- /dev/null +++ b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out @@ -0,0 +1,20 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_bulk +FAILED: RuntimeException Please set hfile.family.path to target location for HFiles diff --git a/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out new file mode 100644 index 0000000..f0abcd5 --- /dev/null +++ b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out @@ -0,0 +1,20 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_bulk +FAILED: RuntimeException Please set hfile.family.path to target location for HFiles diff --git a/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out new file mode 100644 index 0000000..aadedbd --- /dev/null +++ b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out @@ -0,0 +1,109 @@ +PREHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- -*- mode:sql -*- + +DROP TABLE IF EXISTS hbase_bulk +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_bulk +PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Select Operator + expressions: UDFToInteger(KEY.reducesinkkey0) (type: int), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat + serde: org.apache.hadoop.hive.hbase.HBaseSerDe + name: default.hbase_bulk + +PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Select Operator + expressions: UDFToInteger(KEY.reducesinkkey0) (type: int), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat + serde: org.apache.hadoop.hive.hbase.HBaseSerDe + name: default.hbase_bulk + + Stage: Stage-0 + Move Operator + HBase completeBulkLoad: + table: + input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat + output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat + serde: org.apache.hadoop.hive.hbase.HBaseSerDe + name: default.hbase_bulk + + Stage: Stage-2 + Stats-Aggr Operator + diff --git a/ql/pom.xml b/ql/pom.xml index 13c477a..b4ecff6 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -331,7 +331,17 @@ org.apache.hadoop hadoop-core ${hadoop-20S.version} - true + true + + + org.apache.hbase + hbase-client + ${hbase.hadoop1.version} + + + org.apache.hbase + hbase-server + ${hbase.hadoop1.version} @@ -381,6 +391,16 @@ ${hadoop-23.version} true + + org.apache.hbase + hbase-client + ${hbase.hadoop2.version} + + + org.apache.hbase + hbase-server + ${hbase.hadoop2.version} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e1dc911..31fa883 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -20,10 +20,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -186,6 +192,20 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { } } + private void completeBulkLoad(Path sourcePath, String targetTable, Configuration conf) throws Exception { + LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); + HConnection conn = null; + HTable table = null; + try { + conn = HConnectionManager.createConnection(conf); + table = (HTable) conn.getTable(targetTable); + loadIncrementalHFiles.doBulkLoad(sourcePath, table); + } finally { + if (table != null) table.close(); + if (conn != null) conn.close(); + } + } + @Override public int execute(DriverContext driverContext) { @@ -428,6 +448,41 @@ public int execute(DriverContext driverContext) { releaseLocks(tbd); } + // for HFiles + HBaseCompleteBulkLoadDesc cbld = work.getCompleteBulkLoadWork(); + if (cbld != null) { + // lookup hfile.family.path. Duplicated from HiveHFileOutputFormat#getFamilyPath + Configuration conf = driverContext.getCtx().getConf(); + Properties tableProps = cbld.getTable().getProperties(); + Path columnFamilyPath = new Path(conf.get("hfile.family.path", tableProps.getProperty("hfile.family.path"))); + Path sourcePath = columnFamilyPath.getParent(); + // TODO: assert hfile.family.path is a directory of HFiles + assert sourcePath.getFileSystem(driverContext.getCtx().getConf()).isDirectory(sourcePath) : sourcePath + " is not a directory."; + + String tableName = tableProps.getProperty("hbase.table.name" /* HBaseSerDe#HBASE_TABLE_NAME */); + conf = HBaseConfiguration.create(conf); + console.printInfo("Registering HFiles with RegionServers: " + sourcePath + " => " + tableName); + completeBulkLoad(sourcePath, tableName, conf); + + // after bulkload, all hfiles should be gone + FileSystem fs = columnFamilyPath.getFileSystem(conf); + FileStatus[] files = fs.listStatus(columnFamilyPath); + if (files == null || files.length == 0) { + // bulkload succeeded. Clean up empty column family directory. + fs.delete(columnFamilyPath, true); + } else { + // bulkload failed. report abandoned files. + long totalSize = 0; + for (FileStatus f : files) { + totalSize += f.getLen(); + } + String msg = "Failed to bulkload all HFiles in " + columnFamilyPath + ". Roughly " + + StringUtils.humanReadableInt(totalSize) + "bytes abandoned."; + console.printError("HFiles remain; registration failed!", msg); + return 1; + } + } + return 0; } catch (Exception e) { console.printError("Failed with exception " + e.getMessage(), "\n" diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 6922f89..4006031 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -95,6 +95,9 @@ public int execute(DriverContext driverContext) { if (work.getLoadFileDesc() != null) { workComponentsPresent++; } + if (work.getCompleteBulkLoadDesc() != null) { + workComponentsPresent++; + } assert (workComponentsPresent == 1); @@ -102,8 +105,10 @@ public int execute(DriverContext driverContext) { try { if (work.getLoadTableDesc() != null) { tableName = work.getLoadTableDesc().getTable().getTableName(); - } else if (work.getTableSpecs() != null){ + } else if (work.getTableSpecs() != null) { tableName = work.getTableSpecs().tableName; + } else if (work.getCompleteBulkLoadDesc() != null) { + tableName = work.getCompleteBulkLoadDesc().getTable().getTableName(); } else { tableName = work.getLoadFileDesc().getDestinationCreateTable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 77f56c1..aa05417 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1241,7 +1241,8 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // 2. Constructing a conditional task consisting of a move task and a map reduce task // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), + null, false, false); MapWork cplan; Serializable work; @@ -1391,6 +1392,8 @@ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, statsWork = new StatsWork(mvWork.getLoadTableWork()); } else if (mvWork.getLoadFileWork() != null) { statsWork = new StatsWork(mvWork.getLoadFileWork()); + } else if (mvWork.getCompleteBulkLoadWork() != null) { + statsWork = new StatsWork(mvWork.getCompleteBulkLoadWork()); } assert statsWork != null : "Error when genereting StatsTask"; @@ -1601,6 +1604,8 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { srcDir = mvWork.getLoadFileWork().getSourcePath(); } else if (mvWork.getLoadTableWork() != null) { srcDir = mvWork.getLoadTableWork().getSourcePath(); + } else if (mvWork.getCompleteBulkLoadWork() != null) { + srcDir = mvWork.getCompleteBulkLoadWork().getSourcePath(); } if ((srcDir != null) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 0d37fbc..d265c85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -970,8 +970,8 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), - conf); + Task moveTsk = TaskFactory.get( + new MoveWork(null, null, ltd, null, null, false, false), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@ -1595,8 +1595,8 @@ private void analyzeAlterTablePartMergeFiles(ASTNode tablePartAST, ASTNode ast, LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), - conf); + Task moveTsk = TaskFactory.get( + new MoveWork(null, null, ltd, null, null, false, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 71471f4..5bb90a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -282,7 +282,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Utilities.getTableDesc(table), new TreeMap(), false); Task loadTableTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, false), conf); + getOutputs(), loadTableWork, null, null, false, false), conf); copyTask.addDependentTask(loadTableTask); rootTasks.add(copyTask); return loadTableTask; @@ -330,7 +330,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { partSpec.getPartSpec(), true); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - getInputs(), getOutputs(), loadTableWork, null, false), + getInputs(), getOutputs(), loadTableWork, null, null, false, false), conf); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 8bd24d3..50d7674 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -271,7 +271,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite); Task childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true, isLocal), conf); + getOutputs(), loadTableWork, null, null, true, isLocal), conf); if (rTask != null) { rTask.addDependentTask(childTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 359bddf..ac40257 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -85,6 +86,7 @@ private HashMap nameToSplitSample; private List loadTableWork; private List loadFileWork; + private List completeBulkLoadWork; private Context ctx; private HiveConf conf; private HashMap idToTableNameMap; @@ -170,7 +172,7 @@ public ParseContext( HashMap topToTable, HashMap> topToProps, Map fsopToTable, - List loadTableWork, List loadFileWork, + List loadTableWork, List loadFileWork, List completeBulkLoadWork, Context ctx, HashMap idToTableNameMap, int destTableId, UnionProcContext uCtx, List> listMapJoinOpsNoReducer, Map> groupOpToInputTables, @@ -195,6 +197,7 @@ public ParseContext( this.topToProps = topToProps; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; + this.completeBulkLoadWork = completeBulkLoadWork; this.opParseCtx = opParseCtx; this.topOps = topOps; this.topSelOps = topSelOps; @@ -451,6 +454,14 @@ public void setLoadFileWork(List loadFileWork) { this.loadFileWork = loadFileWork; } + public List getCompleteBulkLoadWork() { + return completeBulkLoadWork; + } + + public void setCompleteBulkLoadWork(List completeBulkLoadWork) { + this.completeBulkLoadWork = completeBulkLoadWork; + } + public HashMap getIdToTableNameMap() { return idToTableNameMap; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1902bfd..de58554 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -143,6 +143,7 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; @@ -213,6 +214,7 @@ private LinkedHashMap, OpParseContext> opParseCtx; private List loadTableWork; private List loadFileWork; + private List completeBulkLoadWork; private Map joinContext; private Map smbMapJoinContext; private final HashMap topToTable; @@ -285,6 +287,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { topSelOps = new HashMap>(); loadTableWork = new ArrayList(); loadFileWork = new ArrayList(); + completeBulkLoadWork = new ArrayList(); opParseCtx = new LinkedHashMap, OpParseContext>(); joinContext = new HashMap(); smbMapJoinContext = new HashMap(); @@ -312,6 +315,7 @@ protected void reset() { super.reset(); loadTableWork.clear(); loadFileWork.clear(); + completeBulkLoadWork.clear(); topOps.clear(); topSelOps.clear(); destTableId = 1; @@ -336,6 +340,7 @@ public void initParseCtx(ParseContext pctx) { opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); + completeBulkLoadWork = pctx.getCompleteBulkLoadWork(); joinContext = pctx.getJoinContext(); smbMapJoinContext = pctx.getSmbMapJoinContext(); ctx = pctx.getContext(); @@ -353,8 +358,8 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, - fsopToTable, loadTableWork, - loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + fsopToTable, loadTableWork, loadFileWork, completeBulkLoadWork, + ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, @@ -5491,6 +5496,35 @@ private boolean checkHoldDDLTime(QB qb) { return false; } + /** + * Return true when {@code table} is registered with the HBaseStorageHandler, false otherwise. + */ + private boolean isHBaseTable(Table table) { + return table != null + && table.getStorageHandler() != null + && table.getStorageHandler().getClass().getSimpleName().equals("HBaseStorageHandler"); + } + + /** + * Return true when the bulkload operation should be completed -- that is, we need the extra + * move task that calls completeBulkLoad. + * + * Logic duplicated from {@code HBaseStorageHandler#isHBaseCompleteBulkLoad(Configuration)} + */ + private boolean isHBaseCompleteBulkLoad(HiveConf conf) { + return conf.getBoolean("hive.hbase.completebulkload", false); + } + + /** + * Return true when HBaseStorageHandler should generate hfiles instead of operate against the + * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true. + * + * Logic duplicated from {@code HBaseStorageHandler#isHBaseGenerateHFiles(Configuration)} + */ + private boolean isHBaseGenerateHFiles(HiveConf conf) { + return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false); + } + @SuppressWarnings("nls") private Operator genFileSinkPlan(String dest, QB qb, Operator input) throws SemanticException { @@ -5509,6 +5543,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) SortBucketRSCtx rsCtx = new SortBucketRSCtx(); DynamicPartitionCtx dpCtx = null; LoadTableDesc ltd = null; + HBaseCompleteBulkLoadDesc cbld = null; boolean holdDDLTime = checkHoldDDLTime(qb); ListBucketingCtx lbCtx = null; @@ -5585,15 +5620,15 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) } boolean isNonNativeTable = dest_tab.isNonNative(); - if (isNonNativeTable) { - queryTmpdir = dest_path; - } else { - // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. + if (!isNonNativeTable || isHBaseTable(dest_tab) && isHBaseGenerateHFiles(conf)) { + // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir // on same namespace as tbl dir. queryTmpdir = dest_path.toUri().getScheme().equals("viewfs") ? ctx.getExtTmpPathRelTo(dest_path.getParent().toUri()) : ctx.getExternalTmpPath(dest_path.toUri()); + } else { + queryTmpdir = dest_path; } if (dpCtx != null) { // set the root of the temporay path where dynamic partition columns will populate @@ -5628,6 +5663,16 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) loadTableWork.add(ltd); } + // only complete the bulkload when explicitly requested. + if (isHBaseTable(dest_tab) && isHBaseCompleteBulkLoad(conf)) { + /* + * In order for the MoveTask to be added to the plan, cbld.getSourcePath() must match + * finalDirName in GenMapRedUtils#findMoveTask + */ + cbld = new HBaseCompleteBulkLoadDesc(queryTmpdir, table_desc); + completeBulkLoadWork.add(cbld); + } + WriteEntity output = null; // Here only register the whole table for post-exec hook if no DP present @@ -9371,7 +9416,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, fsopToTable, - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, completeBulkLoadWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 68f1153..8e322c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -87,6 +88,7 @@ public void compile(final ParseContext pCtx, final List loadTableWork = pCtx.getLoadTableWork(); List loadFileWork = pCtx.getLoadFileWork(); + List completeBulkLoadWork = pCtx.getCompleteBulkLoadWork(); boolean isCStats = qb.isAnalyzeRewrite(); @@ -135,7 +137,7 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, null, false, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { @@ -188,10 +190,14 @@ public void compile(final ParseContext pCtx, final List inputs, HashSet outputs) { public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal) { + final HBaseCompleteBulkLoadDesc completeBulkLoadWork, boolean checkFileFormat, + boolean srcLocal) { this(inputs, outputs); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; + this.completeBulkLoadWork = completeBulkLoadWork; this.checkFileFormat = checkFileFormat; this.srcLocal = srcLocal; } - public MoveWork(HashSet inputs, HashSet outputs, - final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { - this(inputs, outputs); - this.loadTableWork = loadTableWork; - this.loadFileWork = loadFileWork; - this.checkFileFormat = checkFileFormat; - } - @Explain(displayName = "tables") public LoadTableDesc getLoadTableWork() { return loadTableWork; @@ -108,6 +102,15 @@ public void setLoadFileWork(final LoadFileDesc loadFileWork) { this.loadFileWork = loadFileWork; } + @Explain(displayName = "HBase completeBulkLoad") + public HBaseCompleteBulkLoadDesc getCompleteBulkLoadWork() { + return completeBulkLoadWork; + } + + public void setCompleteBulkLoadWork(HBaseCompleteBulkLoadDesc completeBulkLoadWork) { + this.completeBulkLoadWork = completeBulkLoadWork; + } + public boolean getCheckFileFormat() { return checkFileFormat; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index 66d4d4a..d572d74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -34,6 +34,7 @@ private tableSpec tableSpecs; // source table spec -- for TableScanOperator private LoadTableDesc loadTableDesc; // same as MoveWork.loadTableDesc -- for FileSinkOperator private LoadFileDesc loadFileDesc; // same as MoveWork.loadFileDesc -- for FileSinkOperator + private HBaseCompleteBulkLoadDesc completeBulkLoadDesc; private String aggKey; // aggregation key prefix private boolean statsReliable; // are stats completely reliable @@ -70,6 +71,10 @@ public StatsWork(LoadFileDesc loadFileDesc) { this.loadFileDesc = loadFileDesc; } + public StatsWork(HBaseCompleteBulkLoadDesc completeBulkLoadDesc) { + this.completeBulkLoadDesc = completeBulkLoadDesc; + } + public StatsWork(boolean statsReliable) { this.statsReliable = statsReliable; } @@ -86,6 +91,10 @@ public LoadFileDesc getLoadFileDesc() { return loadFileDesc; } + public HBaseCompleteBulkLoadDesc getCompleteBulkLoadDesc() { + return completeBulkLoadDesc; + } + public void setAggKey(String aggK) { aggKey = aggK; } -- 1.9.0