From 1276af23d53b0d7aba6eff4ca4421c9a14d7d5fa Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 17 Nov 2014 08:36:08 +0100 Subject: [PATCH] HBASE-12459 Use a non-managed Table in mapred.TableOutputFormat --- .../hadoop/hbase/mapred/TableOutputFormat.java | 53 ++++++++++------------ 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 5a5f544..dab39a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -44,8 +46,7 @@ import org.apache.hadoop.util.Progressable; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TableOutputFormat extends -FileOutputFormat { +public class TableOutputFormat extends FileOutputFormat { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; @@ -53,55 +54,49 @@ FileOutputFormat { /** * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) - * and write to an HBase table + * and write to an HBase table. */ - protected static class TableRecordWriter - implements RecordWriter { - private Table m_table; + protected static class TableRecordWriter implements RecordWriter { + private final Connection conn; + private final Table table; /** - * Instantiate a TableRecordWriter with the HBase HClient for writing. - * - * @param table + * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the + * lifecycle of {@code conn}. */ - public TableRecordWriter(Table table) { - m_table = table; + public TableRecordWriter(Connection conn, TableName tableName) throws IOException { + this.conn = conn; + this.table = conn.getTable(tableName); + ((HTable) this.table).setAutoFlush(false, true); } - public void close(Reporter reporter) - throws IOException { - m_table.close(); + public void close(Reporter reporter) throws IOException { + table.close(); + conn.close(); } - public void write(ImmutableBytesWritable key, - Put value) throws IOException { - m_table.put(new Put(value)); + public void write(ImmutableBytesWritable key, Put value) throws IOException { + table.put(new Put(value)); } } @Override - @SuppressWarnings("unchecked") - public RecordWriter getRecordWriter(FileSystem ignored, - JobConf job, String name, Progressable progress) throws IOException { - - // expecting exactly one path - + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); - HTable table = null; + Connection conn = null; try { - table = new HTable(HBaseConfiguration.create(job), tableName); + conn = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); } catch(IOException e) { LOG.error(e); throw e; } - table.setAutoFlush(false, true); - return new TableRecordWriter(table); + return new TableRecordWriter(conn, tableName); } @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { - + throws FileAlreadyExistsException, InvalidJobConfException, IOException { String tableName = job.get(OUTPUT_TABLE); if(tableName == null) { throw new IOException("Must specify table name"); -- 1.9.3 (Apple Git-50)