diff --git core/src/main/java/org/apache/hadoop/hbase/client/BulkPut.java core/src/main/java/org/apache/hadoop/hbase/client/BulkPut.java new file mode 100644 index 0000000..1c4cdd5 --- /dev/null +++ core/src/main/java/org/apache/hadoop/hbase/client/BulkPut.java @@ -0,0 +1,77 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +/** + * 'BulkPut' object encapsulating a bulk of puts, useful when when inserting a + * large number of 'Put's. + * + * Instead of having put as the raw output, sometimes it makes sense to consolidate the puts together and then write serially to + * save network traffic to hdfs (at the cost of memory). + * + */ +public class BulkPut implements Writable { + + private List puts = new ArrayList(); + + public BulkPut() { + + } + + public BulkPut(final List puts) { + this.puts = puts; + } + + public void addPut(final Put put) { + this.puts.add(put); + } + + public List allPuts() { + return puts; + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + this.puts.clear(); + for (int i = 0; i < size; ++i) { + Put put = new Put(); + put.readFields(in); + this.puts.add(put); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(puts.size()); + for (Put put : puts) { + put.write(out); + } + } + +} diff --git core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 41fe3f9..6278011 100644 --- core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -25,8 +25,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.BulkPut; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.JobContext; @@ -103,7 +105,10 @@ public class TableOutputFormat extends OutputFormat { throws IOException { if (value instanceof Put) this.table.put(new Put((Put)value)); else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); - else throw new IOException("Pass a Delete or a Put"); + else if (value instanceof BulkPut) { + this.table.put( ((BulkPut)value).allPuts() ); + } + else throw new IOException(value.getClass() + " is not a Delete / Put / BulkPut"); } } diff --git core/src/test/java/org/apache/hadoop/hbase/client/TestBulkPut.java core/src/test/java/org/apache/hadoop/hbase/client/TestBulkPut.java new file mode 100644 index 0000000..2c91643 --- /dev/null +++ core/src/test/java/org/apache/hadoop/hbase/client/TestBulkPut.java @@ -0,0 +1,100 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; + +public class TestBulkPut { + + @Test + public void testSerialization() throws IOException { + String family = "family"; + String qualifier = "qualifier1"; + String qualifier2 = "qualifier2"; + Put putA = new Put(Bytes.toBytes("rowA")); + putA.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(1)); + Put putB = new Put(Bytes.toBytes("rowB")); + putB + .add(Bytes.toBytes(family), Bytes.toBytes(qualifier2), Bytes.toBytes(2)); + List input = new ArrayList(); + BulkPut bulkInput = new BulkPut(input); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(baos); + bulkInput.write(out); + + BulkPut bulkOutput = new BulkPut(); + bulkOutput.readFields(new DataInputStream(new ByteArrayInputStream(baos + .toByteArray()))); + + List output = bulkOutput.allPuts(); + Assert.assertArrayEquals(input.toArray(), output.toArray()); + } + + @Test + public void testSerializationWithInput() throws IOException { + BulkPut bulkInput = new BulkPut(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(baos); + bulkInput.write(out); + + BulkPut bulkOutput = new BulkPut(); + String family = "family"; + String qualifier = "qualifier1"; + Put putA = new Put(Bytes.toBytes("rowA")); + // Add something to bulkOutput + bulkOutput.addPut(putA); + putA.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(1)); + bulkOutput.readFields(new DataInputStream(new ByteArrayInputStream(baos + .toByteArray()))); + + List output = bulkOutput.allPuts(); + // serialization should erase the list + Assert.assertEquals(0, output.size()); + } + + @Test + public void testSerializationOfEmptyList() throws IOException { + BulkPut bulkInput = new BulkPut(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(baos); + bulkInput.write(out); + + BulkPut bulkOutput = new BulkPut(); + bulkOutput.readFields(new DataInputStream(new ByteArrayInputStream(baos + .toByteArray()))); + + List output = bulkOutput.allPuts(); + Assert.assertEquals(0, output.size()); + } +}