diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java index 4b8f62c..24bd146 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java @@ -19,7 +19,12 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -32,10 +37,8 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.hbase.PutWritable; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -46,28 +49,12 @@ /** * HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables. - * We also need to implement the @deprecated org.apache.hadoop.mapred.OutFormat - * class to keep it compliant with Hive interfaces. */ public class HiveHBaseTableOutputFormat extends TableOutputFormat implements - OutputFormat { + HiveOutputFormat { static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableOutputFormat.class); - public static final String HBASE_WAL_ENABLED = "hive.hbase.wal.enabled"; - - /** - * Update the out table, and output an empty key as the key. - * - * @param jc the job configuration file - * @param finalOutPath the final output table name - * @param valueClass the value class - * @param isCompressed whether the content is compressed or not - * @param tableProperties the table info of the corresponding table - * @param progress progress used for status report - * @return the RecordWriter for the output file - */ - @Override public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { @@ -98,13 +85,7 @@ public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { String name, Progressable progressable) throws IOException { - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); - final boolean walEnabled = HiveConf.getBoolVar( - jobConf, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED); - final HTable table = new HTable(HBaseConfiguration.create(jobConf), hbaseTableName); - table.setAutoFlush(false); - return new MyRecordWriter(table,walEnabled); + return getMyRecordWriter(jobConf); } @Override @@ -113,8 +94,27 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE return new TableOutputCommitter(); } + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jobConf, Path finalOutPath, Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { - static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter { + return getMyRecordWriter(jobConf); + } + + private MyRecordWriter getMyRecordWriter(JobConf jobConf) throws IOException { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); + final boolean walEnabled = HiveConf.getBoolVar( + jobConf, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED); + final HTable table = new HTable(HBaseConfiguration.create(jobConf), hbaseTableName); + table.setAutoFlush(false); + return new MyRecordWriter(table,walEnabled); + } + + static private class MyRecordWriter + implements org.apache.hadoop.mapred.RecordWriter, + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { private final HTable m_table; private final boolean m_walEnabled; @@ -154,5 +154,15 @@ protected void finalize() throws Throwable { super.finalize(); } } + + @Override + public void write(Writable w) throws IOException { + write(null, w); + } + + @Override + public void close(boolean abort) throws IOException { + close(null); + } } }