diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 52b8e45..c8ec171 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -79,28 +81,17 @@ implements Configurable { /** The configuration. */ private Configuration conf = null; - private HTable table; + private Table table; + private Connection connection; /** * Writes the reducer output to an HBase table. * * @param The type of the key. */ - protected static class TableRecordWriter + protected class TableRecordWriter extends RecordWriter { - /** The table to write to. */ - private Table table; - - /** - * Instantiate a TableRecordWriter with the HBase HClient for writing. - * - * @param table The table to write to. - */ - public TableRecordWriter(Table table) { - this.table = table; - } - /** * Closes the writer, in this case flush table commits. * @@ -112,6 +103,7 @@ implements Configurable { public void close(TaskAttemptContext context) throws IOException { table.close(); + connection.close(); } /** @@ -125,8 +117,8 @@ implements Configurable { @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) this.table.put(new Put((Put)value)); - else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); + if (value instanceof Put) table.put(new Put((Put)value)); + else if (value instanceof Delete) table.delete(new Delete((Delete)value)); else throw new IOException("Pass a Delete or a Put"); } } @@ -144,7 +136,7 @@ implements Configurable { public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { - return new TableRecordWriter(this.table); + return new TableRecordWriter(); } /** @@ -205,8 +197,9 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.table = new HTable(this.conf, TableName.valueOf(tableName)); - this.table.setAutoFlush(false, true); + this.connection = ConnectionFactory.createConnection(this.conf); + this.table = connection.getTable(TableName.valueOf(tableName)); + ((HTable) this.table).setAutoFlush(false, true); LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e);