diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index 0cf1acc..b6582f8 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Progressable; @@ -115,6 +116,7 @@ public RecordWriter getHiveRecordWriter( job.getConfiguration(), progressable); final Path outputdir = FileOutputFormat.getOutputPath(tac); + final Path taskAttemptOutputdir = FileOutputCommitter.getTaskAttemptPath(tac, outputdir); final org.apache.hadoop.mapreduce.RecordWriter< ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac); @@ -148,7 +150,7 @@ public void close(boolean abort) throws IOException { // location specified by the user. FileSystem fs = outputdir.getFileSystem(jc); fs.mkdirs(columnFamilyPath); - Path srcDir = outputdir; + Path srcDir = taskAttemptOutputdir; for (;;) { FileStatus [] files = fs.listStatus(srcDir, FileUtils.STAGING_DIR_PATH_FILTER); if ((files == null) || (files.length == 0)) { @@ -161,6 +163,11 @@ public void close(boolean abort) throws IOException { if (srcDir.getName().equals(columnFamilyName)) { break; } + if (files[0].isFile()) { + throw new IOException("No family directories found in " + taskAttemptOutputdir + ". " + + "The last component in hfile path should match column family name " + + columnFamilyName); + } } for (FileStatus regionFile : fs.listStatus(srcDir, FileUtils.STAGING_DIR_PATH_FILTER)) { fs.rename( @@ -171,8 +178,8 @@ public void close(boolean abort) throws IOException { } // Hive actually wants a file as task output (not a directory), so // replace the empty directory with an empty file to keep it happy. - fs.delete(outputdir, true); - fs.createNewFile(outputdir); + fs.delete(taskAttemptOutputdir, true); + fs.createNewFile(taskAttemptOutputdir); } catch (InterruptedException ex) { throw new IOException(ex); } diff --git a/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q index f03da63..85581ec 100644 --- a/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q +++ b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q @@ -10,6 +10,9 @@ tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk'); set hive.hbase.generatehfiles=true; set hfile.family.path=/tmp/hb_target/cf; +set mapreduce.input.fileinputformat.split.maxsize=200; +set mapreduce.input.fileinputformat.split.minsize=200; +set mapred.reduce.tasks=2; -- this should produce three files in /tmp/hb_target/cf insert overwrite table hb_target select distinct key, value from src cluster by key;