diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
index 4b0d38f..8691540 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
@@ -363,11 +363,19 @@ public void configureTableJobProperties(
}
/**
+ * Return true when the bulkload operation should be completed -- that is, we need the extra
+ * move task that calls completeBulkLoad.
+ */
+ public static boolean isHBaseCompleteBulkLoad(Configuration conf) {
+ return conf.getBoolean("hive.hbase.completebulkload", false);
+ }
+
+ /**
* Return true when HBaseStorageHandler should generate hfiles instead of operate against the
* online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true.
*/
public static boolean isHBaseGenerateHFiles(Configuration conf) {
- return conf.getBoolean("hive.hbase.generatehfiles", false);
+ return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false);
}
/**
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
index 08572a0..0e0e0c5 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
@@ -143,12 +143,19 @@ public void close(boolean abort) throws IOException {
if (abort) {
return;
}
- // Move the hfiles file(s) from the task output directory to the
- // location specified by the user.
+ /*
+ * Move the hfiles file(s) from the task output directory to the
+ * location specified by the user.
+ *
+ * TODO: simplify bulkload to detecting the HBaseStorageHandler scenario, ignore
+ * hfile.family.path, skip this move step and allow MoveTask to operate directly off
+ * of SemanticAnalyzer's queryTempdir.
+ */
FileSystem fs = outputdir.getFileSystem(jc);
fs.mkdirs(columnFamilyPath);
Path srcDir = outputdir;
for (;;) {
+ LOG.debug("Looking for column family names in " + srcDir);
FileStatus [] files = fs.listStatus(srcDir);
if ((files == null) || (files.length == 0)) {
throw new IOException("No family directories found in " + srcDir);
@@ -162,6 +169,8 @@ public void close(boolean abort) throws IOException {
}
}
for (FileStatus regionFile : fs.listStatus(srcDir)) {
+ LOG.debug("Moving hfile " + regionFile.getPath() + " to new parent directory "
+ + columnFamilyPath);
fs.rename(
regionFile.getPath(),
new Path(
diff --git a/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q
new file mode 100644
index 0000000..ef15416
--- /dev/null
+++ b/hbase-handler/src/test/queries/negative/bulk_completebulkload_require_family_path.q
@@ -0,0 +1,10 @@
+-- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk;
+
+CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string');
+
+SET hive.hbase.completebulkload = true;
+INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;
diff --git a/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q
new file mode 100644
index 0000000..6844fbc
--- /dev/null
+++ b/hbase-handler/src/test/queries/negative/bulk_generatehfiles_require_family_path.q
@@ -0,0 +1,10 @@
+-- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk;
+
+CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string');
+
+SET hive.hbase.generatehfiles = true;
+INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;
diff --git a/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q
new file mode 100644
index 0000000..1540c6f
--- /dev/null
+++ b/hbase-handler/src/test/queries/positive/hbase_storage_handler_bulk.q
@@ -0,0 +1,15 @@
+-- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk;
+
+CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string');
+
+SET hive.hbase.generatehfiles = true;
+SET hfile.family.path = /tmp/bulk_hfiles/f;
+EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;
+
+SET hive.hbase.completebulkload = true;
+SET hfile.family.path = /tmp/bulk_hfiles/f;
+EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;
diff --git a/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out
new file mode 100644
index 0000000..f0abcd5
--- /dev/null
+++ b/hbase-handler/src/test/results/negative/bulk_completebulkload_require_family_path.q.out
@@ -0,0 +1,20 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_bulk
+FAILED: RuntimeException Please set hfile.family.path to target location for HFiles
diff --git a/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out
new file mode 100644
index 0000000..f0abcd5
--- /dev/null
+++ b/hbase-handler/src/test/results/negative/bulk_generatehfiles_require_family_path.q.out
@@ -0,0 +1,20 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_bulk
+FAILED: RuntimeException Please set hfile.family.path to target location for HFiles
diff --git a/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out
new file mode 100644
index 0000000..e70d9b1
--- /dev/null
+++ b/hbase-handler/src/test/results/positive/hbase_storage_handler_bulk.q.out
@@ -0,0 +1,113 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_bulk
+PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string)
+ Reduce Operator Tree:
+ Extract
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col1 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
+ output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat
+ serde: org.apache.hadoop.hive.hbase.HBaseSerDe
+ name: default.hbase_bulk
+
+PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+ Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string)
+ Reduce Operator Tree:
+ Extract
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col1 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
+ output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat
+ serde: org.apache.hadoop.hive.hbase.HBaseSerDe
+ name: default.hbase_bulk
+
+ Stage: Stage-0
+ Move Operator
+ HBase completeBulkLoad:
+ table:
+ input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
+ output format: org.apache.hadoop.hive.hbase.HiveHFileOutputFormat
+ serde: org.apache.hadoop.hive.hbase.HBaseSerDe
+ name: default.hbase_bulk
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
diff --git a/ql/pom.xml b/ql/pom.xml
index 13c477a..b4ecff6 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -331,7 +331,17 @@
org.apache.hadoop
hadoop-core
${hadoop-20S.version}
- true
+ true
+
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.hadoop1.version}
+
+
+ org.apache.hbase
+ hbase-server
+ ${hbase.hadoop1.version}
@@ -381,6 +391,16 @@
${hadoop-23.version}
true
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.hadoop2.version}
+
+
+ org.apache.hbase
+ hbase-server
+ ${hbase.hadoop2.version}
+
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e1dc911..31fa883 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -20,10 +20,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -186,6 +192,20 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException {
}
}
+ private void completeBulkLoad(Path sourcePath, String targetTable, Configuration conf) throws Exception {
+ LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf);
+ HConnection conn = null;
+ HTable table = null;
+ try {
+ conn = HConnectionManager.createConnection(conf);
+ table = (HTable) conn.getTable(targetTable);
+ loadIncrementalHFiles.doBulkLoad(sourcePath, table);
+ } finally {
+ if (table != null) table.close();
+ if (conn != null) conn.close();
+ }
+ }
+
@Override
public int execute(DriverContext driverContext) {
@@ -428,6 +448,41 @@ public int execute(DriverContext driverContext) {
releaseLocks(tbd);
}
+ // for HFiles
+ HBaseCompleteBulkLoadDesc cbld = work.getCompleteBulkLoadWork();
+ if (cbld != null) {
+ // lookup hfile.family.path. Duplicated from HiveHFileOutputFormat#getFamilyPath
+ Configuration conf = driverContext.getCtx().getConf();
+ Properties tableProps = cbld.getTable().getProperties();
+ Path columnFamilyPath = new Path(conf.get("hfile.family.path", tableProps.getProperty("hfile.family.path")));
+ Path sourcePath = columnFamilyPath.getParent();
+ // TODO: assert hfile.family.path is a directory of HFiles
+ assert sourcePath.getFileSystem(driverContext.getCtx().getConf()).isDirectory(sourcePath) : sourcePath + " is not a directory.";
+
+ String tableName = tableProps.getProperty("hbase.table.name" /* HBaseSerDe#HBASE_TABLE_NAME */);
+ conf = HBaseConfiguration.create(conf);
+ console.printInfo("Registering HFiles with RegionServers: " + sourcePath + " => " + tableName);
+ completeBulkLoad(sourcePath, tableName, conf);
+
+ // after bulkload, all hfiles should be gone
+ FileSystem fs = columnFamilyPath.getFileSystem(conf);
+ FileStatus[] files = fs.listStatus(columnFamilyPath);
+ if (files == null || files.length == 0) {
+ // bulkload succeeded. Clean up empty column family directory.
+ fs.delete(columnFamilyPath, true);
+ } else {
+ // bulkload failed. report abandoned files.
+ long totalSize = 0;
+ for (FileStatus f : files) {
+ totalSize += f.getLen();
+ }
+ String msg = "Failed to bulkload all HFiles in " + columnFamilyPath + ". Roughly "
+ + StringUtils.humanReadableInt(totalSize) + "bytes abandoned.";
+ console.printError("HFiles remain; registration failed!", msg);
+ return 1;
+ }
+ }
+
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 6922f89..4006031 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -95,6 +95,9 @@ public int execute(DriverContext driverContext) {
if (work.getLoadFileDesc() != null) {
workComponentsPresent++;
}
+ if (work.getCompleteBulkLoadDesc() != null) {
+ workComponentsPresent++;
+ }
assert (workComponentsPresent == 1);
@@ -102,8 +105,10 @@ public int execute(DriverContext driverContext) {
try {
if (work.getLoadTableDesc() != null) {
tableName = work.getLoadTableDesc().getTable().getTableName();
- } else if (work.getTableSpecs() != null){
+ } else if (work.getTableSpecs() != null) {
tableName = work.getTableSpecs().tableName;
+ } else if (work.getCompleteBulkLoadDesc() != null) {
+ tableName = work.getCompleteBulkLoadDesc().getTable().getTableName();
} else {
tableName = work.getLoadFileDesc().getDestinationCreateTable();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index f285312..57b17a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1239,7 +1239,8 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
// 2. Constructing a conditional task consisting of a move task and a map reduce task
//
MoveWork dummyMv = new MoveWork(null, null, null,
- new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
+ new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null),
+ null, false, false);
MapWork cplan;
Serializable work;
@@ -1389,6 +1390,8 @@ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
statsWork = new StatsWork(mvWork.getLoadTableWork());
} else if (mvWork.getLoadFileWork() != null) {
statsWork = new StatsWork(mvWork.getLoadFileWork());
+ } else if (mvWork.getCompleteBulkLoadWork() != null) {
+ statsWork = new StatsWork(mvWork.getCompleteBulkLoadWork());
}
assert statsWork != null : "Error when genereting StatsTask";
@@ -1599,6 +1602,8 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
srcDir = mvWork.getLoadFileWork().getSourcePath();
} else if (mvWork.getLoadTableWork() != null) {
srcDir = mvWork.getLoadTableWork().getSourcePath();
+ } else if (mvWork.getCompleteBulkLoadWork() != null) {
+ srcDir = mvWork.getCompleteBulkLoadWork().getSourcePath();
}
if ((srcDir != null)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 4a0056c..67831bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -966,8 +966,8 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException {
LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
partSpec == null ? new HashMap() : partSpec);
ltd.setLbCtx(lbCtx);
- Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
- conf);
+ Task moveTsk = TaskFactory.get(
+ new MoveWork(null, null, ltd, null, null, false, false), conf);
truncateTask.addDependentTask(moveTsk);
// Recalculate the HDFS stats if auto gather stats is set
@@ -1586,8 +1586,8 @@ private void analyzeAlterTablePartMergeFiles(ASTNode tablePartAST, ASTNode ast,
LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
partSpec == null ? new HashMap() : partSpec);
ltd.setLbCtx(lbCtx);
- Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
- conf);
+ Task moveTsk = TaskFactory.get(
+ new MoveWork(null, null, ltd, null, null, false, false), conf);
mergeTask.addDependentTask(moveTsk);
if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 71471f4..5bb90a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -282,7 +282,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
Utilities.getTableDesc(table), new TreeMap(),
false);
Task> loadTableTask = TaskFactory.get(new MoveWork(getInputs(),
- getOutputs(), loadTableWork, null, false), conf);
+ getOutputs(), loadTableWork, null, null, false, false), conf);
copyTask.addDependentTask(loadTableTask);
rootTasks.add(copyTask);
return loadTableTask;
@@ -330,7 +330,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
partSpec.getPartSpec(), true);
loadTableWork.setInheritTableSpecs(false);
Task> loadPartTask = TaskFactory.get(new MoveWork(
- getInputs(), getOutputs(), loadTableWork, null, false),
+ getInputs(), getOutputs(), loadTableWork, null, null, false, false),
conf);
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 8bd24d3..50d7674 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -271,7 +271,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
Task extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
- getOutputs(), loadTableWork, null, true, isLocal), conf);
+ getOutputs(), loadTableWork, null, null, true, isLocal), conf);
if (rTask != null) {
rTask.addDependentTask(childTask);
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
index 76f5a31..e009f5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
@@ -124,8 +124,8 @@ public ParseContext getParseContext(ParseContext pCtx, List nameToSplitSample;
private List loadTableWork;
private List loadFileWork;
+ private List completeBulkLoadWork;
private Context ctx;
private HiveConf conf;
private HashMap idToTableNameMap;
@@ -170,7 +172,7 @@ public ParseContext(
HashMap topToTable,
HashMap> topToProps,
Map fsopToTable,
- List loadTableWork, List loadFileWork,
+ List loadTableWork, List loadFileWork, List completeBulkLoadWork,
Context ctx, HashMap idToTableNameMap, int destTableId,
UnionProcContext uCtx, List> listMapJoinOpsNoReducer,
Map> groupOpToInputTables,
@@ -195,6 +197,7 @@ public ParseContext(
this.topToProps = topToProps;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
+ this.completeBulkLoadWork = completeBulkLoadWork;
this.opParseCtx = opParseCtx;
this.topOps = topOps;
this.topSelOps = topSelOps;
@@ -451,6 +454,14 @@ public void setLoadFileWork(List loadFileWork) {
this.loadFileWork = loadFileWork;
}
+ public List getCompleteBulkLoadWork() {
+ return completeBulkLoadWork;
+ }
+
+ public void setCompleteBulkLoadWork(List completeBulkLoadWork) {
+ this.completeBulkLoadWork = completeBulkLoadWork;
+ }
+
public HashMap getIdToTableNameMap() {
return idToTableNameMap;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 49eb83f..d5d49c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -143,6 +143,7 @@
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
@@ -213,6 +214,7 @@
private LinkedHashMap, OpParseContext> opParseCtx;
private List loadTableWork;
private List loadFileWork;
+ private List completeBulkLoadWork;
private Map joinContext;
private Map smbMapJoinContext;
private final HashMap topToTable;
@@ -285,6 +287,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException {
topSelOps = new HashMap>();
loadTableWork = new ArrayList();
loadFileWork = new ArrayList();
+ completeBulkLoadWork = new ArrayList();
opParseCtx = new LinkedHashMap, OpParseContext>();
joinContext = new HashMap();
smbMapJoinContext = new HashMap();
@@ -312,6 +315,7 @@ protected void reset() {
super.reset();
loadTableWork.clear();
loadFileWork.clear();
+ completeBulkLoadWork.clear();
topOps.clear();
topSelOps.clear();
destTableId = 1;
@@ -336,6 +340,7 @@ public void initParseCtx(ParseContext pctx) {
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
loadFileWork = pctx.getLoadFileWork();
+ completeBulkLoadWork = pctx.getCompleteBulkLoadWork();
joinContext = pctx.getJoinContext();
smbMapJoinContext = pctx.getSmbMapJoinContext();
ctx = pctx.getContext();
@@ -353,8 +358,8 @@ public void initParseCtx(ParseContext pctx) {
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps,
- fsopToTable, loadTableWork,
- loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ fsopToTable, loadTableWork, loadFileWork, completeBulkLoadWork,
+ ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
@@ -5478,6 +5483,35 @@ private boolean checkHoldDDLTime(QB qb) {
return false;
}
+ /**
+ * Return true when {@code table} is registered with the HBaseStorageHandler, false otherwise.
+ */
+ private boolean isHBaseTable(Table table) {
+ return table != null
+ && table.getStorageHandler() != null
+ && table.getStorageHandler().getClass().getSimpleName().equals("HBaseStorageHandler");
+ }
+
+ /**
+ * Return true when the bulkload operation should be completed -- that is, we need the extra
+ * move task that calls completeBulkLoad.
+ *
+ * Logic duplicated from {@code HBaseStorageHandler#isHBaseCompleteBulkLoad(Configuration)}
+ */
+ private boolean isHBaseCompleteBulkLoad(HiveConf conf) {
+ return conf.getBoolean("hive.hbase.completebulkload", false);
+ }
+
+ /**
+ * Return true when HBaseStorageHandler should generate hfiles instead of operate against the
+ * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true.
+ *
+ * Logic duplicated from {@code HBaseStorageHandler#isHBaseGenerateHFiles(Configuration)}
+ */
+ private boolean isHBaseGenerateHFiles(HiveConf conf) {
+ return isHBaseCompleteBulkLoad(conf) || conf.getBoolean("hive.hbase.generatehfiles", false);
+ }
+
@SuppressWarnings("nls")
private Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
@@ -5496,6 +5530,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
SortBucketRSCtx rsCtx = new SortBucketRSCtx();
DynamicPartitionCtx dpCtx = null;
LoadTableDesc ltd = null;
+ HBaseCompleteBulkLoadDesc cbld = null;
boolean holdDDLTime = checkHoldDDLTime(qb);
ListBucketingCtx lbCtx = null;
@@ -5572,15 +5607,15 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
}
boolean isNonNativeTable = dest_tab.isNonNative();
- if (isNonNativeTable) {
- queryTmpdir = dest_path;
- } else {
- // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/..
+ if (!isNonNativeTable || isHBaseTable(dest_tab) && isHBaseGenerateHFiles(conf)) {
+ // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/..
// to final /user/hive/warehouse/ will fail later, so instead pick tmp dir
// on same namespace as tbl dir.
queryTmpdir = dest_path.toUri().getScheme().equals("viewfs") ?
ctx.getExtTmpPathRelTo(dest_path.getParent().toUri()) :
ctx.getExternalTmpPath(dest_path.toUri());
+ } else {
+ queryTmpdir = dest_path;
}
if (dpCtx != null) {
// set the root of the temporay path where dynamic partition columns will populate
@@ -5615,6 +5650,16 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
loadTableWork.add(ltd);
}
+ // only complete the bulkload when explicitly requested.
+ if (isHBaseTable(dest_tab) && isHBaseCompleteBulkLoad(conf)) {
+ /*
+ * In order for the MoveTask to be added to the plan, cbld.getSourcePath() must match
+ * finalDirName in GenMapRedUtils#findMoveTask
+ */
+ cbld = new HBaseCompleteBulkLoadDesc(queryTmpdir, table_desc);
+ completeBulkLoadWork.add(cbld);
+ }
+
WriteEntity output = null;
// Here only register the whole table for post-exec hook if no DP present
@@ -9233,7 +9278,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
topToTable, topToTableProps, fsopToTable,
- loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ loadTableWork, loadFileWork, completeBulkLoadWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index b58a0a3..0798385 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.HBaseCompleteBulkLoadDesc;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -87,6 +88,7 @@ public void compile(final ParseContext pCtx, final List loadTableWork = pCtx.getLoadTableWork();
List loadFileWork = pCtx.getLoadFileWork();
+ List completeBulkLoadWork = pCtx.getCompleteBulkLoadWork();
boolean isCStats = qb.isAnalyzeRewrite();
@@ -135,7 +137,7 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
+ Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, null, false, false), conf);
mvTask.add(tsk);
// Check to see if we are stale'ing any indexes and auto-update them if we want
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
@@ -188,10 +190,14 @@ public void compile(final ParseContext pCtx, final List inputs, HashSet outputs) {
public MoveWork(HashSet inputs, HashSet outputs,
final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
- boolean checkFileFormat, boolean srcLocal) {
+ final HBaseCompleteBulkLoadDesc completeBulkLoadWork, boolean checkFileFormat,
+ boolean srcLocal) {
this(inputs, outputs);
this.loadTableWork = loadTableWork;
this.loadFileWork = loadFileWork;
+ this.completeBulkLoadWork = completeBulkLoadWork;
this.checkFileFormat = checkFileFormat;
this.srcLocal = srcLocal;
}
- public MoveWork(HashSet inputs, HashSet outputs,
- final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
- boolean checkFileFormat) {
- this(inputs, outputs);
- this.loadTableWork = loadTableWork;
- this.loadFileWork = loadFileWork;
- this.checkFileFormat = checkFileFormat;
- }
-
@Explain(displayName = "tables")
public LoadTableDesc getLoadTableWork() {
return loadTableWork;
@@ -108,6 +102,15 @@ public void setLoadFileWork(final LoadFileDesc loadFileWork) {
this.loadFileWork = loadFileWork;
}
+ @Explain(displayName = "HBase completeBulkLoad")
+ public HBaseCompleteBulkLoadDesc getCompleteBulkLoadWork() {
+ return completeBulkLoadWork;
+ }
+
+ public void setCompleteBulkLoadWork(HBaseCompleteBulkLoadDesc completeBulkLoadWork) {
+ this.completeBulkLoadWork = completeBulkLoadWork;
+ }
+
public boolean getCheckFileFormat() {
return checkFileFormat;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
index 66d4d4a..d572d74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
@@ -34,6 +34,7 @@
private tableSpec tableSpecs; // source table spec -- for TableScanOperator
private LoadTableDesc loadTableDesc; // same as MoveWork.loadTableDesc -- for FileSinkOperator
private LoadFileDesc loadFileDesc; // same as MoveWork.loadFileDesc -- for FileSinkOperator
+ private HBaseCompleteBulkLoadDesc completeBulkLoadDesc;
private String aggKey; // aggregation key prefix
private boolean statsReliable; // are stats completely reliable
@@ -70,6 +71,10 @@ public StatsWork(LoadFileDesc loadFileDesc) {
this.loadFileDesc = loadFileDesc;
}
+ public StatsWork(HBaseCompleteBulkLoadDesc completeBulkLoadDesc) {
+ this.completeBulkLoadDesc = completeBulkLoadDesc;
+ }
+
public StatsWork(boolean statsReliable) {
this.statsReliable = statsReliable;
}
@@ -86,6 +91,10 @@ public LoadFileDesc getLoadFileDesc() {
return loadFileDesc;
}
+ public HBaseCompleteBulkLoadDesc getCompleteBulkLoadDesc() {
+ return completeBulkLoadDesc;
+ }
+
public void setAggKey(String aggK) {
aggKey = aggK;
}