.../mapreduce/MapReduceHFileSplitterJob.java | 33 +-- .../java/org/apache/hadoop/hbase/CellUtil.java | 7 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 3 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 3 +- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 23 ++ .../hadoop/hbase/mapreduce/CellSerialization.java | 92 +++++++ .../hadoop/hbase/mapreduce/CellSortReducer.java | 59 +++++ .../apache/hadoop/hbase/mapreduce/CopyTable.java | 2 +- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 49 ++-- .../org/apache/hadoop/hbase/mapreduce/Import.java | 127 +++++----- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 2 +- .../hbase/mapreduce/KeyValueSerialization.java | 88 ------- .../hbase/mapreduce/KeyValueSortReducer.java | 57 ----- .../hadoop/hbase/mapreduce/PutSortReducer.java | 2 +- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 2 +- .../hadoop/hbase/mapreduce/TextSortReducer.java | 2 +- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 18 +- .../apache/hadoop/hbase/util/MapReduceCell.java | 268 +++++++++++++++++++++ .../hbase/mapreduce/TestHFileOutputFormat2.java | 6 +- .../hadoop/hbase/mapreduce/TestImportExport.java | 4 +- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 2 +- 21 files changed, 578 insertions(+), 271 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 49e8c75..edcfd8a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -24,10 +24,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; @@ -37,9 +37,10 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.mapreduce.CellSortReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -70,24 +71,16 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { /** * A mapper that just writes out cells. This one can be used together with - * {@link KeyValueSortReducer} + * {@link CellSortReducer} */ static class HFileCellMapper extends - Mapper { + Mapper { @Override - public void map(NullWritable key, KeyValue value, Context context) throws IOException, - InterruptedException { - // Convert value to KeyValue if subclass - if (!value.getClass().equals(KeyValue.class)) { - value = - new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), - value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), - value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), - value.getValueOffset(), value.getValueLength()); - } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + public void map(NullWritable key, Cell value, Context context) + throws IOException, InterruptedException { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), + new MapReduceCell(((ExtendedCell) value))); } @Override @@ -119,14 +112,14 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); TableName tableName = TableName.valueOf(tabName); job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } LOG.debug("success configuring load incremental job"); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index d1a72b7..78b6cf0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -1716,7 +1717,7 @@ public final class CellUtil { * @param out * @throws IOException */ - public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException { + public static void writeFlatKey(Cell cell, DataOutput out) throws IOException { short rowLen = cell.getRowLength(); byte fLen = cell.getFamilyLength(); int qLen = cell.getQualifierLength(); @@ -1796,7 +1797,7 @@ public final class CellUtil { public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix); } else { out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix); @@ -1846,7 +1847,7 @@ public final class CellUtil { public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell, int qlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(), ((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix); } else { out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix, diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index 5d7a379..cf1a1e7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -262,7 +263,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, kLength); ByteBufferUtils.putCompressedInt(out, vLength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); // Write the value part CellUtil.writeValue(out, cell, cell.getValueLength()); } else { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 842894f..0e3a5e6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -59,7 +60,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, klength); ByteBufferUtils.putCompressedInt(out, vlength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); } else { // find a common prefix and skip it int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index ed6ee09..dee45a6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -193,6 +194,28 @@ public final class ByteBufferUtils { } } + /** + * Copy data from a buffer to an output stream. Does not update the position + * in the buffer. + * @param out the output stream to write bytes to + * @param in the buffer to read bytes from + * @param offset the offset in the buffer (from the buffer's array offset) + * to start copying bytes from + * @param length the number of bytes to copy + */ + public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length) + throws IOException { + if (out instanceof ByteBufferWriter) { + ((ByteBufferWriter) out).write(in, offset, length); + } else if (in.hasArray()) { + out.write(in.array(), in.arrayOffset() + offset, length); + } else { + for (int i = 0; i < length; ++i) { + out.write(toByte(in, offset + i)); + } + } + } + public static int putLong(OutputStream out, final long value, final int fitInBytes) throws IOException { long tmpValue = value; diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java new file mode 100644 index 0000000..1910ab3 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +@InterfaceAudience.Public +public class CellSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return Cell.class.isAssignableFrom(c); + } + + @Override + public CellDeserializer getDeserializer(Class t) { + return new CellDeserializer(); + } + + @Override + public CellSerializer getSerializer(Class c) { + return new CellSerializer(); + } + + public static class CellDeserializer implements Deserializer { + private DataInputStream dis; + + @Override + public void close() throws IOException { + this.dis.close(); + } + + @Override + public KeyValue deserialize(Cell ignore) throws IOException { + // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO + return KeyValueUtil.create(this.dis); + } + + @Override + public void open(InputStream is) throws IOException { + this.dis = new DataInputStream(is); + } + } + + public static class CellSerializer implements Serializer { + private DataOutputStream dos; + + @Override + public void close() throws IOException { + this.dos.close(); + } + + @Override + public void open(OutputStream os) throws IOException { + this.dos = new DataOutputStream(os); + } + + @Override + public void serialize(Cell kv) throws IOException { + assert kv instanceof ExtendedCell; + dos.writeInt(((ExtendedCell) kv).getSerializedSize(true)); + ((ExtendedCell) kv).write(this.dos, true); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java new file mode 100644 index 0000000..df06f77 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.MapReduceCell; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Emits sorted KeyValues. + * Reads in all KeyValues from passed Iterator, sorts them, then emits + * KeyValues in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat2 + */ +@InterfaceAudience.Public +public class CellSortReducer + extends Reducer { + protected void reduce(ImmutableBytesWritable row, Iterable kvs, + Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); + for (Cell kv : kvs) { + if (!(kv instanceof ExtendedCell)) { + throw new IOException(new CloneNotSupportedException()); + } + map.add(((ExtendedCell) kv).deepClone()); + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (Cell kv: map) { + context.write(row, new MapReduceCell(((ExtendedCell) kv))); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 513beb4..0cbf30c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -139,7 +139,7 @@ public class CopyTable extends Configured implements Tool { job.setNumReduceTasks(0); if (bulkload) { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, + TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null, job); // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index ca6b1e9..a975a69 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -43,8 +43,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -52,10 +57,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -65,15 +68,14 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; @@ -86,8 +88,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - /** * Writes HFiles. Passed Cells must arrive in order. * Writes current time as the sequence id for the file. Sets the major compacted @@ -229,14 +229,13 @@ public class HFileOutputFormat2 private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + private final long now = EnvironmentEdgeManager.currentTime(); private boolean rollRequested = false; @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - + Cell kv = cell; // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); @@ -244,7 +243,14 @@ public class HFileOutputFormat2 } byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); + int length = 0; + if (kv instanceof ExtendedCell) { + length = ((ExtendedCell) kv).getSerializedSize(true); + } else { + // Is this else really needed? + length = CellUtil.estimatedSerializedSizeOfKey(cell) + cell.getValueLength() + + KeyValue.ROW_OFFSET; + } byte[] family = CellUtil.cloneFamily(kv); byte[] tableNameBytes = null; if (writeMultipleTables) { @@ -333,7 +339,8 @@ public class HFileOutputFormat2 } // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); + // TODO : Currently in SettableTimeStamp but this will also move to ExtendedCell + CellUtil.updateLatestStamp(cell, this.now); wl.writer.append(kv); wl.written += length; @@ -574,10 +581,11 @@ public class HFileOutputFormat2 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } - static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { + static void configureIncrementalLoad(Job job, List multiTableInfo, + Class> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceCell.class); job.setOutputFormatClass(cls); if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { @@ -591,8 +599,9 @@ public class HFileOutputFormat2 // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); + if (KeyValue.class.equals(job.getMapOutputValueClass()) + || MapReduceCell.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(CellSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { @@ -603,7 +612,7 @@ public class HFileOutputFormat2 conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); @@ -651,7 +660,7 @@ public class HFileOutputFormat2 Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceCell.class); job.setOutputFormatClass(HFileOutputFormat2.class); ArrayList singleTableDescriptor = new ArrayList<>(1); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 18dcf35..cd6e4a0 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.RawComparator; @@ -96,11 +98,11 @@ public class Import extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - public static class KeyValueWritableComparablePartitioner - extends Partitioner { - private static KeyValueWritableComparable[] START_KEYS = null; + public static class CellWritableComparablePartitioner + extends Partitioner { + private static CellWritableComparable[] START_KEYS = null; @Override - public int getPartition(KeyValueWritableComparable key, KeyValue value, + public int getPartition(CellWritableComparable key, Cell value, int numPartitions) { for (int i = 0; i < START_KEYS.length; ++i) { if (key.compareTo(START_KEYS[i]) <= 0) { @@ -112,27 +114,30 @@ public class Import extends Configured implements Tool { } - public static class KeyValueWritableComparable - implements WritableComparable { + public static class CellWritableComparable + implements WritableComparable { - private KeyValue kv = null; + private Cell kv = null; static { // register this comparator - WritableComparator.define(KeyValueWritableComparable.class, - new KeyValueWritableComparator()); + WritableComparator.define(CellWritableComparable.class, + new CellWritableComparator()); } - public KeyValueWritableComparable() { + public CellWritableComparable() { } - public KeyValueWritableComparable(KeyValue kv) { + public CellWritableComparable(Cell kv) { this.kv = kv; } @Override public void write(DataOutput out) throws IOException { - KeyValue.write(kv, out); + assert kv instanceof ExtendedCell; + out.writeInt(CellUtil.estimatedSerializedSizeOfKey(kv)); + out.writeInt(0); + CellUtil.writeFlatKey(kv, out); } @Override @@ -143,18 +148,18 @@ public class Import extends Configured implements Tool { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", justification="This is wrong, yes, but we should be purging Writables, not fixing them") - public int compareTo(KeyValueWritableComparable o) { - return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); + public int compareTo(CellWritableComparable o) { + return CellComparator.COMPARATOR.compare(this.kv, ((CellWritableComparable)o).kv); } - public static class KeyValueWritableComparator extends WritableComparator { + public static class CellWritableComparator extends WritableComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { - KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); + CellWritableComparable kv1 = new CellWritableComparable(); kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); - KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); + CellWritableComparable kv2 = new CellWritableComparable(); kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); return compare(kv1, kv2); } catch (IOException e) { @@ -166,18 +171,19 @@ public class Import extends Configured implements Tool { } - public static class KeyValueReducer + public static class CellReducer extends - Reducer { + Reducer { protected void reduce( - KeyValueWritableComparable row, - Iterable kvs, - Reducer.Context context) + CellWritableComparable row, + Iterable kvs, + Reducer.Context context) throws java.io.IOException, InterruptedException { int index = 0; - for (KeyValue kv : kvs) { - context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); + for (Cell kv : kvs) { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), + new MapReduceCell((ExtendedCell) kv)); if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); @@ -185,11 +191,11 @@ public class Import extends Configured implements Tool { } } - public static class KeyValueSortImporter - extends TableMapper { + public static class CellSortImporter + extends TableMapper { private Map cfRenameMap; private Filter filter; - private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + private static final Log LOG = LogFactory.getLog(CellImporter.class); /** * @param row The current table row key. @@ -213,9 +219,8 @@ public class Import extends Configured implements Tool { kv = filterKv(filter, kv); // skip if we filtered it out if (kv == null) continue; - // TODO get rid of ensureKeyValue - KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); - context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + Cell ret = convertKv(kv, cfRenameMap); + context.write(new CellWritableComparable(ret), ret); } } } catch (InterruptedException e) { @@ -236,13 +241,13 @@ public class Import extends Configured implements Tool { if (startKeys.length != reduceNum) { throw new IOException("Region split after job initialization"); } - KeyValueWritableComparable[] startKeyWraps = - new KeyValueWritableComparable[startKeys.length - 1]; + CellWritableComparable[] startKeyWraps = + new CellWritableComparable[startKeys.length - 1]; for (int i = 1; i < startKeys.length; ++i) { startKeyWraps[i - 1] = - new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); + new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); } - KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + CellWritableComparablePartitioner.START_KEYS = startKeyWraps; } } } @@ -252,10 +257,10 @@ public class Import extends Configured implements Tool { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", justification="Writables are going away and this has been this way forever") - public static class KeyValueImporter extends TableMapper { + public static class CellImporter extends TableMapper { private Map cfRenameMap; private Filter filter; - private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + private static final Log LOG = LogFactory.getLog(CellImporter.class); /** * @param row The current table row key. @@ -505,21 +510,21 @@ public class Import extends Configured implements Tool { if(cfRenameMap != null) { // If there's a rename mapping for this CF, create a new KeyValue byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); - if(newCfName != null) { - kv = new KeyValue(kv.getRowArray(), // row buffer - kv.getRowOffset(), // row offset - kv.getRowLength(), // row length - newCfName, // CF buffer - 0, // CF offset - newCfName.length, // CF length - kv.getQualifierArray(), // qualifier buffer - kv.getQualifierOffset(), // qualifier offset - kv.getQualifierLength(), // qualifier length - kv.getTimestamp(), // timestamp - KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type - kv.getValueArray(), // value buffer - kv.getValueOffset(), // value offset - kv.getValueLength()); // value length + if (newCfName != null) { + kv = new KeyValue(kv.getRowArray(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getQualifierArray(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type + kv.getValueArray(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength()); // value length } } return kv; @@ -626,35 +631,35 @@ public class Import extends Configured implements Tool { Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - job.setMapperClass(KeyValueSortImporter.class); - job.setReducerClass(KeyValueReducer.class); + job.setMapperClass(CellSortImporter.class); + job.setReducerClass(CellReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(KeyValueWritableComparable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputKeyClass(CellWritableComparable.class); + job.setMapOutputValueClass(MapReduceCell.class); job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", - KeyValueWritableComparable.KeyValueWritableComparator.class, + CellWritableComparable.CellWritableComparator.class, RawComparator.class); Path partitionsPath = new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); FileSystem fs = FileSystem.get(job.getConfiguration()); fs.deleteOnExit(partitionsPath); - job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); + job.setPartitionerClass(CellWritableComparablePartitioner.class); job.setNumReduceTasks(regionLocator.getStartKeys().length); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); } } else if (hfileOutPath != null) { LOG.info("writing to hfiles for bulk load."); - job.setMapperClass(KeyValueImporter.class); + job.setMapperClass(CellImporter.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)){ - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 44b856c..184eb4a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -584,7 +584,7 @@ public class ImportTsv extends Configured implements Tool { job.getConfiguration().setStrings("io.serializations", job.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java deleted file mode 100644 index 241608b..0000000 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - -@InterfaceAudience.Public -public class KeyValueSerialization implements Serialization { - @Override - public boolean accept(Class c) { - return KeyValue.class.isAssignableFrom(c); - } - - @Override - public KeyValueDeserializer getDeserializer(Class t) { - return new KeyValueDeserializer(); - } - - @Override - public KeyValueSerializer getSerializer(Class c) { - return new KeyValueSerializer(); - } - - public static class KeyValueDeserializer implements Deserializer { - private DataInputStream dis; - - @Override - public void close() throws IOException { - this.dis.close(); - } - - @Override - public KeyValue deserialize(KeyValue ignore) throws IOException { - // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO - return KeyValueUtil.create(this.dis); - } - - @Override - public void open(InputStream is) throws IOException { - this.dis = new DataInputStream(is); - } - } - - public static class KeyValueSerializer implements Serializer { - private DataOutputStream dos; - - @Override - public void close() throws IOException { - this.dos.close(); - } - - @Override - public void open(OutputStream os) throws IOException { - this.dos = new DataOutputStream(os); - } - - @Override - public void serialize(KeyValue kv) throws IOException { - KeyValueUtil.write(kv, this.dos); - } - } -} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java deleted file mode 100644 index 997e5a8..0000000 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.util.TreeSet; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.Reducer; - -/** - * Emits sorted KeyValues. - * Reads in all KeyValues from passed Iterator, sorts them, then emits - * KeyValues in sorted order. If lots of columns per row, it will use lots of - * memory sorting. - * @see HFileOutputFormat2 - */ -@InterfaceAudience.Public -public class KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable row, Iterable kvs, - Reducer.Context context) - throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); - for (KeyValue kv: kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv: map) { - context.write(row, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } -} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index 17ab9cb..69bc7fa 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -48,7 +48,7 @@ import org.apache.hadoop.util.StringUtils; * Puts in sorted order. If lots of columns per row, it will use lots of * memory sorting. * @see HFileOutputFormat2 - * @see KeyValueSortReducer + * @see CellSortReducer */ @InterfaceAudience.Public public class PutSortReducer extends diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 5517c9c..40c18a9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -207,7 +207,7 @@ public class TableMapReduceUtil { conf.set(TableInputFormat.SCAN, convertScanToString(scan)); conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); if (addDependencyJars) { addDependencyJars(job); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 30cd461..1bd3850 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -45,7 +45,7 @@ import org.apache.hadoop.util.StringUtils; /** * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. * @see HFileOutputFormat2 - * @see KeyValueSortReducer + * @see CellSortReducer * @see PutSortReducer */ @InterfaceAudience.Public diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index b1e655c..e5bfd56 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -30,9 +30,8 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; @@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -94,10 +94,10 @@ public class WALPlayer extends Configured implements Tool { /** * A mapper that just writes out KeyValues. - * This one can be used together with {@link KeyValueSortReducer} + * This one can be used together with {@link CellSortReducer} */ static class WALKeyValueMapper - extends Mapper { + extends Mapper { private byte[] table; @Override @@ -108,11 +108,11 @@ public class WALPlayer extends Configured implements Tool { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (WALEdit.isMetaEditFamily(kv)) { + if (WALEdit.isMetaEditFamily(cell)) { continue; } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), + new MapReduceCell((ExtendedCell) cell)); } } } catch (InterruptedException e) { @@ -300,10 +300,10 @@ public class WALPlayer extends Configured implements Tool { } TableName tableName = TableName.valueOf(tables[0]); job.setMapperClass(WALKeyValueMapper.class); - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java new file mode 100644 index 0000000..4b0555b --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java @@ -0,0 +1,268 @@ +package org.apache.hadoop.hbase.util; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferCell; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A wrapper for an extended cell to be used with mapreduce + * as the output value class for mappers/reducers. + */ +@InterfaceAudience.Private +public class MapReduceCell extends ByteBufferCell implements ExtendedCell { + + private final ExtendedCell cell; + + public MapReduceCell(ExtendedCell cell) { + this.cell = cell; + } + + @Override + public byte[] getRowArray() { + return this.cell.getRowArray(); + } + + @Override + public int getRowOffset() { + return this.cell.getRowOffset(); + } + + @Override + public short getRowLength() { + return this.cell.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return this.cell.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return this.cell.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return this.cell.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return this.cell.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return this.cell.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return this.cell.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return this.cell.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return this.cell.getTypeByte(); + } + + @Override + public long getSequenceId() { + return this.cell.getSequenceId(); + } + + @Override + public byte[] getValueArray() { + return this.cell.getValueArray(); + } + + @Override + public int getValueOffset() { + return this.cell.getValueOffset(); + } + + @Override + public int getValueLength() { + return this.cell.getValueLength(); + } + + @Override + public byte[] getTagsArray() { + return this.cell.getTagsArray(); + } + + @Override + public int getTagsOffset() { + return this.cell.getTagsOffset(); + } + + @Override + public int getTagsLength() { + return this.cell.getTagsLength(); + } + + @Override + public void setSequenceId(long seqId) throws IOException { + this.cell.setSequenceId(seqId); + } + + @Override + public void setTimestamp(long ts) throws IOException { + this.cell.setTimestamp(ts); + + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + this.cell.setTimestamp(ts, tsOffset); + } + + @Override + public long heapSize() { + return this.cell.heapSize(); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + return this.cell.write(out, withTags); + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.cell.getSerializedSize(withTags); + } + + @Override + public void write(ByteBuffer buf, int offset) { + this.cell.write(buf, offset); + } + + @Override + public Cell deepClone() { + return this.cell.deepClone(); + } + + @Override + public ByteBuffer getRowByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getRowByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneRow(this.cell)); + } + } + + @Override + public int getRowPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getRowPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getFamilyByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneFamily(this.cell)); + } + } + + @Override + public int getFamilyPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getFamilyPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getQualifierByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneQualifier(this.cell)); + } + } + + @Override + public int getQualifierPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getQualifierPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getValueByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getValueByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneValue(this.cell)); + } + } + + @Override + public int getValuePosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getValuePosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getTagsByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getTagsByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneTags(this.cell)); + } + } + + @Override + public int getTagsPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getTagsPosition(); + } else { + return 0; + } + } + + @Override + public String toString() { + return this.cell.toString(); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index cbff2de..cbba508 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -440,12 +440,12 @@ public class TestHFileOutputFormat2 { // Set start and end rows for partitioner. SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); job.setOutputFormatClass(HFileOutputFormat2.class); job.setNumReduceTasks(4); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); FileOutputFormat.setOutputPath(job, testDir); assertTrue(job.waitForCompletion(false)); @@ -764,7 +764,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); setupRandomGeneratorMapper(job, putSortReducer); if (tableInfo.size() > 1) { MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 60d88bc..2d88b42 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; +import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL; @@ -664,7 +664,7 @@ public class TestImportExport { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testKeyValueImporter() throws Throwable { - KeyValueImporter importer = new KeyValueImporter(); + CellImporter importer = new CellImporter(); Configuration configuration = new Configuration(); Context ctx = mock(Context.class); when(ctx.getConfiguration()).thenReturn(configuration); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 427c5cc..24e3f51 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -168,7 +168,7 @@ public class TestWALPlayer { WALKey key = mock(WALKey.class); when(key.getTablename()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper.Context context = mock(Context.class); + Mapper.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class);