commit 45b3ae266d76a91802a8f5ae6f627cff1bebec59 Author: Alice Fan Date: Mon Oct 8 10:21:18 2018 -0700 HIVE-20678 : HiveHBaseTableOutputFormat should implement HiveOutputFormat to ensure compatibility diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java index 08576c2219..b344e16c13 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java @@ -19,11 +19,16 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.util.Properties; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -37,7 +42,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -48,28 +52,12 @@ /** * HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables. - * We also need to implement the @deprecated org.apache.hadoop.mapred.OutFormat - * class to keep it compliant with Hive interfaces. */ public class HiveHBaseTableOutputFormat extends TableOutputFormat implements - OutputFormat { + HiveOutputFormat { static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableOutputFormat.class); - public static final String HBASE_WAL_ENABLED = "hive.hbase.wal.enabled"; - - /** - * Update the out table, and output an empty key as the key. - * - * @param jc the job configuration file - * @param finalOutPath the final output table name - * @param valueClass the value class - * @param isCompressed whether the content is compressed or not - * @param tableProperties the table info of the corresponding table - * @param progress progress used for status report - * @return the RecordWriter for the output file - */ - @Override public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { @@ -99,7 +87,23 @@ public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { JobConf jobConf, String name, Progressable progressable) throws IOException { + return getMyRecordWriter(jobConf); + } + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new TableOutputCommitter(); + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jobConf, Path finalOutPath, Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + return getMyRecordWriter(jobConf); + } + + private MyRecordWriter getMyRecordWriter(JobConf jobConf) throws IOException { String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); final boolean walEnabled = HiveConf.getBoolVar( @@ -109,14 +113,9 @@ public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { return new MyRecordWriter(table, conn, walEnabled); } - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, - InterruptedException { - return new TableOutputCommitter(); - } - - - static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter { + private static class MyRecordWriter + implements org.apache.hadoop.mapred.RecordWriter, + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { private final BufferedMutator m_table; private final boolean m_walEnabled; private final Connection m_connection; @@ -127,8 +126,7 @@ public MyRecordWriter(BufferedMutator table, Connection connection, boolean walE m_connection = connection; } - public void close(Reporter reporter) - throws IOException { + public void close(Reporter reporter) throws IOException { m_table.close(); } @@ -159,5 +157,15 @@ protected void finalize() throws Throwable { super.finalize(); } } + + @Override + public void write(Writable w) throws IOException { + write(null, w); + } + + @Override + public void close(boolean abort) throws IOException { + close(null); + } } } diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java new file mode 100644 index 0000000000..1bbcadad7e --- /dev/null +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import static org.junit.Assert.assertTrue; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * This is a simple test to make sure HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables. + */ +public class TestHiveHBaseTableOutputFormat { + + @Test + public void testInstanceOfHiveHBaseTableOutputFormat() { + HiveHBaseTableOutputFormat hBaseOutputFormat = Mockito.mock(HiveHBaseTableOutputFormat.class); + assertTrue(hBaseOutputFormat instanceof TableOutputFormat); + assertTrue(hBaseOutputFormat instanceof HiveOutputFormat); + } +}