Index: src/main/java/org/apache/hadoop/hbase/client/Row.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Row.java +++ src/main/java/org/apache/hadoop/hbase/client/Row.java @@ -19,10 +19,12 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.io.WritableComparable; + /** * Has a row. */ -interface Row { +public interface Row extends WritableComparable { /** * @return The row. */ Index: src/main/java/org/apache/hadoop/hbase/client/RowMutation.java =================================================================== --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/client/RowMutation.java @@ -0,0 +1,143 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +/** + * Class to be used in Bulk Import MR jobs to be able to pass both Deletes and Puts in as parameters. + */ +public class RowMutation implements Row{ + + /** + * Enum used for serializing + */ + public static enum Type { + Put((byte)4), + Delete((byte)8); + + private final byte code; + + Type(final byte c) { + this.code = c; + } + + /** + * @return the code of this union + */ + public byte getCode() { + return this.code; + } + } + + /** + * Union type of field to hold either a Put or Delete + * Useful for abstractions + */ + private Row row = null; + + /** + * To be used for Writable. + * DO NOT USE!!! + */ + public RowMutation() {} + + /** + * Copy constructor + * @param r the item to copy + * @throws IOException if passed parameter is not of required type + */ + public RowMutation(final RowMutation r) + throws IOException { + this(r.getInstance()); + } + + /** + * Constructor to set the inner union style field. + * @param request -- the Put or Delete to be executed + * @throws IOException if passed parameter is not of required type + */ + public RowMutation(final WritableComparable request) + throws IOException { + if(request instanceof Put) { + row = new Put((Put)request); + } else if(request instanceof Delete) { + row = new Delete((Delete)request); + } else { + throw new IOException("Must pass either a Delete or a Put"); + } + } + + /** + * Method for getting the Row instance from inside + * @return row + */ + public Row getInstance() { + return row; + } + + @Override + public int compareTo(Row o) { + return row.compareTo(o); + } + + @Override + public byte[] getRow() { + return row.getRow(); + } + + @Override + public void readFields(DataInput in) + throws IOException { + byte b = in.readByte(); + + if(Type.Put.getCode() == b) { + row = new Put(); + } else if(Type.Delete.getCode() == b) { + row = new Delete(); + } else { + throw new IOException("Tried to read an invalid type of serialized object!"); + } + + row.readFields(in); + } + + @Override + public void write(DataOutput out) + throws IOException { + byte b = 0; + + if(row instanceof Put) { + b = Type.Put.getCode(); + } else if(row instanceof Delete) { + b = Type.Delete.getCode(); + } else { + throw new IOException("Tried to write an invalid type of object to serialize!"); + } + + out.write(b); + row.write(out); + } + +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -287,6 +288,8 @@ job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); + } else if (RowMutation.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(RowMutationSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } @@ -379,4 +382,4 @@ // Get rid of the last ampersand conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } -} +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java =================================================================== --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java @@ -0,0 +1,99 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; + +/** + * Emits sorted Puts and Deletes. + * Reads in all Puts and Deletes from passed Iterator (over RowMutation elements), sorts them, then emits + * KeyValue items in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat + * @see KeyValueSortReducer + */ +public class RowMutationSortReducer extends + Reducer { + + @Override + protected void reduce( + ImmutableBytesWritable row, + java.lang.Iterable requests, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "rowmutationsortreducer.row.threshold", 2L * (1<<30)); + Iterator iter = requests.iterator(); + while (iter.hasNext()) { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + while (iter.hasNext() && curSize < threshold) { + Row r = iter.next().getInstance(); + Map< byte[], List > familyMap; + if(r instanceof Put) { + familyMap = ((Put) r).getFamilyMap(); + } else if(r instanceof Delete) { + familyMap = ((Delete) r).getFamilyMap(); + } else { + familyMap = null; + } + + if(null != familyMap) { + for (List kvs : familyMap.values()) { + for (KeyValue kv : kvs) { + map.add(kv); + curSize += kv.getValueLength(); + } + } + } + } + context.setStatus("Read " + map.size() + " entries of " + map.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } + } + } +} Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -1,4 +1,4 @@ -/** +/** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -46,6 +46,10 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; + import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.RowMutation; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; @@ -120,7 +126,6 @@ ImmutableBytesWritable,KeyValue>.Context context) throws java.io.IOException ,InterruptedException { - byte keyBytes[] = new byte[keyLength]; byte valBytes[] = new byte[valLength]; @@ -144,7 +149,7 @@ } } } - + @Before public void cleanupDir() throws IOException { util.cleanupTestDir(); @@ -149,8 +154,7 @@ public void cleanupDir() throws IOException { util.cleanupTestDir(); } - - + private void setupRandomGeneratorMapper(Job job) { job.setInputFormatClass(NMapInputFormat.class); job.setMapperClass(RandomKVGeneratingMapper.class); @@ -158,6 +162,88 @@ job.setMapOutputValueClass(KeyValue.class); } + static class RowSorterMapper + extends Mapper { + @Override + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws IOException ,InterruptedException + { + byte[] row = Bytes.toBytes("row1"); + + // need one for every task... + byte[] key = Bytes.toBytes("key"); + + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + byte[] col3 = Bytes.toBytes("col3"); + + Row put1 = new Put(row).add(TestHFileOutputFormat.FAMILIES[0], col1, 10, Bytes.toBytes("val10")); + Row put2 = new Put(row).add(TestHFileOutputFormat.FAMILIES[0], col2, 11, Bytes.toBytes("val11")); + + Row put3 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col1, 20, Bytes.toBytes("val20")); + Row put4 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col2, 21, Bytes.toBytes("val21")); + + Row put5 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 30, Bytes.toBytes("val30")); + Row put6 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 31, Bytes.toBytes("val31")); + Row put7 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 32, Bytes.toBytes("val32")); + + Row del1 = new Delete(row).deleteColumn(TestHFileOutputFormat.FAMILIES[1], col2, 21); + Row del2 = new Delete(row).deleteFamily(TestHFileOutputFormat.FAMILIES[0]); + Row del3 = new Delete(row).deleteColumns(TestHFileOutputFormat.FAMILIES[1], col3); + + context.write(new ImmutableBytesWritable(key), new RowMutation(put1)); + context.write(new ImmutableBytesWritable(key), new RowMutation(put2)); + + context.write(new ImmutableBytesWritable(key), new RowMutation(put3)); + context.write(new ImmutableBytesWritable(key), new RowMutation(put4)); + context.write(new ImmutableBytesWritable(key), new RowMutation(put5)); + context.write(new ImmutableBytesWritable(key), new RowMutation(put6)); + context.write(new ImmutableBytesWritable(key), new RowMutation(put7)); + + context.write(new ImmutableBytesWritable(key), new RowMutation(del1)); + context.write(new ImmutableBytesWritable(key), new RowMutation(del2)); + context.write(new ImmutableBytesWritable(key), new RowMutation(del3)); + } + } + + /** + * Test for the union style MR jobs that runs both Put and Delete requests + * @throws Exception on job, sorting, IO or fs errors + */ + @Test + public void testRowSortReducer() + throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + conf.setInt("io.sort.mb", 20); + + Path dir = HBaseTestingUtility.getTestDir("testRowSortReducer"); + + try { + Job job = new Job(conf); + + job.setInputFormatClass(NMapInputFormat.class); + job.setOutputFormatClass(HFileOutputFormat.class); + + job.setMapperClass(RowSorterMapper.class); // local + job.setReducerClass(RowMutationSortReducer.class); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(RowMutation.class); + + FileOutputFormat.setOutputPath(job, dir); + + assertTrue(job.waitForCompletion(false)); + } finally { +// dir.getFileSystem(conf).delete(dir, true); + } + } + /** * Test that {@link HFileOutputFormat} RecordWriter amends timestamps if * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. @@ -363,8 +449,6 @@ } } - - private void runIncrementalPELoad( Configuration conf, HTable table, Path outDir) throws Exception { @@ -609,4 +693,4 @@ "usage: TestHFileOutputFormat newtable | incremental"); } } -} +}