.../mapreduce/MapReduceHFileSplitterJob.java | 33 +-- .../java/org/apache/hadoop/hbase/CellUtil.java | 114 +++++++- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 3 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 3 +- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 23 ++ .../java/org/apache/hadoop/hbase/TestCellUtil.java | 117 ++++++++ .../hadoop/hbase/mapreduce/CellSerialization.java | 96 +++++++ .../hadoop/hbase/mapreduce/CellSortReducer.java | 60 ++++ .../apache/hadoop/hbase/mapreduce/CopyTable.java | 2 +- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 26 +- .../org/apache/hadoop/hbase/mapreduce/Import.java | 303 +++++++++++++++++---- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 2 +- .../hbase/mapreduce/KeyValueSerialization.java | 8 +- .../hbase/mapreduce/KeyValueSortReducer.java | 3 + .../hadoop/hbase/mapreduce/PutSortReducer.java | 2 +- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 2 +- .../hadoop/hbase/mapreduce/TextSortReducer.java | 2 +- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 54 +++- .../apache/hadoop/hbase/util/MapReduceCell.java | 270 ++++++++++++++++++ .../hbase/mapreduce/TestHFileOutputFormat2.java | 6 +- .../hadoop/hbase/mapreduce/TestImportExport.java | 9 +- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 11 +- 22 files changed, 1037 insertions(+), 112 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 97ece3d..51a6b1d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -24,22 +24,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.CellSortReducer; import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; /** * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles @@ -70,24 +70,15 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { /** * A mapper that just writes out cells. This one can be used together with - * {@link KeyValueSortReducer} + * {@link CellSortReducer} */ static class HFileCellMapper extends - Mapper { + Mapper { @Override - public void map(NullWritable key, KeyValue value, Context context) throws IOException, - InterruptedException { - // Convert value to KeyValue if subclass - if (!value.getClass().equals(KeyValue.class)) { - value = - new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), - value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), - value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), - value.getValueOffset(), value.getValueLength()); - } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + public void map(NullWritable key, Cell value, Context context) + throws IOException, InterruptedException { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), new MapReduceCell(value)); } @Override @@ -119,14 +110,14 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); TableName tableName = TableName.valueOf(tabName); job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } LOG.debug("success configuring load incremental job"); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index a3029f8..44c2cde 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -1417,9 +1418,12 @@ public final class CellUtil { } /** - * Estimate based on keyvalue's serialization format. + * Estimate based on keyvalue's serialization format in the RPC layer. Note that there is an extra + * SIZEOF_INT added to the size here that indicates the actual length of the cell for cases where + * cell's are serialized in a contiguous format (For eg in RPCs). * @param cell - * @return Estimate of the cell size in bytes. + * @return Estimate of the cell size in bytes plus an extra SIZEOF_INT indicating the + * actual cell length. */ public static int estimatedSerializedSizeOf(final Cell cell) { if (cell instanceof ExtendedCell) { @@ -1714,8 +1718,10 @@ public final class CellUtil { * timestamp><1 byte type> * @param cell * @param out + * @deprecated Use {@link #writeFlatKey(Cell, DataOutput)} * @throws IOException */ + @Deprecated public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException { short rowLen = cell.getRowLength(); byte fLen = cell.getFamilyLength(); @@ -1724,6 +1730,43 @@ public final class CellUtil { // component of cell if (cell instanceof ByteBufferCell) { out.writeShort(rowLen); + ByteBufferUtils.copyBufferToStream((DataOutput) out, + ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(), + rowLen); + out.writeByte(fLen); + ByteBufferUtils.copyBufferToStream((DataOutput) out, + ((ByteBufferCell) cell).getFamilyByteBuffer(), ((ByteBufferCell) cell).getFamilyPosition(), + fLen); + ByteBufferUtils.copyBufferToStream((DataOutput) out, + ((ByteBufferCell) cell).getQualifierByteBuffer(), + ((ByteBufferCell) cell).getQualifierPosition(), qLen); + } else { + out.writeShort(rowLen); + out.write(cell.getRowArray(), cell.getRowOffset(), rowLen); + out.writeByte(fLen); + out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen); + out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen); + } + out.writeLong(cell.getTimestamp()); + out.writeByte(cell.getTypeByte()); + } + + /** + * Writes the Cell's key part as it would have serialized in a KeyValue. The format is <2 bytes + * rk len><rk><1 byte cf len><cf><qualifier><8 bytes + * timestamp><1 byte type> + * @param cell + * @param out + * @throws IOException + */ + public static void writeFlatKey(Cell cell, DataOutput out) throws IOException { + short rowLen = cell.getRowLength(); + byte fLen = cell.getFamilyLength(); + int qLen = cell.getQualifierLength(); + // Using just one if/else loop instead of every time checking before writing every + // component of cell + if (cell instanceof ByteBufferCell) { + out.writeShort(rowLen); ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(), rowLen); out.writeByte(fLen); @@ -1742,6 +1785,69 @@ public final class CellUtil { out.writeByte(cell.getTypeByte()); } + /** + * Deep clones the given cell if the cell supports deep cloning + * @param cell the cell to be cloned + * @return the cloned cell + * @throws CloneNotSupportedException + */ + public static Cell deepClone(Cell cell) throws CloneNotSupportedException { + if (cell instanceof ExtendedCell) { + return ((ExtendedCell) cell).deepClone(); + } + throw new CloneNotSupportedException(); + } + + /** + * Writes the cell to the given OutputStream + * @param cell the cell to be written + * @param out the outputstream + * @param withTags if tags are to be written or not + * @return the total bytes written + * @throws IOException + */ + public static int writeCell(Cell cell, OutputStream out, boolean withTags) throws IOException { + if (cell instanceof ExtendedCell) { + return ((ExtendedCell) cell).write(out, withTags); + } else { + ByteBufferUtils.putInt(out, CellUtil.estimatedSerializedSizeOfKey(cell)); + ByteBufferUtils.putInt(out, cell.getValueLength()); + CellUtil.writeFlatKey(cell, out); + CellUtil.writeValue(out, cell, cell.getValueLength()); + int tagsLength = cell.getTagsLength(); + if (withTags) { + byte[] len = new byte[Bytes.SIZEOF_SHORT]; + Bytes.putAsShort(len, 0, tagsLength); + out.write(len); + if (tagsLength > 0) { + CellUtil.writeTags(out, cell, tagsLength); + } + } + int lenWritten = (2 * Bytes.SIZEOF_INT) + CellUtil.estimatedSerializedSizeOfKey(cell) + + cell.getValueLength(); + if (withTags) { + lenWritten += Bytes.SIZEOF_SHORT + tagsLength; + } + return lenWritten; + } + } + + /** + * Writes a cell to the buffer at the given offset + * @param cell the cell to be written + * @param buf the buffer to which the cell has to be wrriten + * @param offset the offset at which the cell should be written + */ + public static void writeCellToBuffer(Cell cell, ByteBuffer buf, int offset) { + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Using the KVUtil + byte[] bytes = KeyValueUtil.copyToNewByteArray(cell); + ByteBufferUtils.copyFromArrayToBuffer(buf, offset, bytes, 0, bytes.length); + } + } + public static int writeFlatKey(Cell cell, OutputStream out) throws IOException { short rowLen = cell.getRowLength(); byte fLen = cell.getFamilyLength(); @@ -1796,7 +1902,7 @@ public final class CellUtil { public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix); } else { out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix); @@ -1846,7 +1952,7 @@ public final class CellUtil { public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell, int qlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(), ((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix); } else { out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix, diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index ac81049..03cf768 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -262,7 +263,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, kLength); ByteBufferUtils.putCompressedInt(out, vLength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); // Write the value part CellUtil.writeValue(out, cell, cell.getValueLength()); } else { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 6f529db..8edb305 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -59,7 +60,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, klength); ByteBufferUtils.putCompressedInt(out, vlength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); } else { // find a common prefix and skip it int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 7ef578d..3fc1a7b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -193,6 +194,28 @@ public final class ByteBufferUtils { } } + /** + * Copy data from a buffer to an output stream. Does not update the position + * in the buffer. + * @param out the output stream to write bytes to + * @param in the buffer to read bytes from + * @param offset the offset in the buffer (from the buffer's array offset) + * to start copying bytes from + * @param length the number of bytes to copy + */ + public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length) + throws IOException { + if (out instanceof ByteBufferWriter) { + ((ByteBufferWriter) out).write(in, offset, length); + } else if (in.hasArray()) { + out.write(in.array(), in.arrayOffset() + offset, length); + } else { + for (int i = 0; i < length; ++i) { + out.write(toByte(in, offset + i)); + } + } + } + public static int putLong(OutputStream out, final long value, final int fitInBytes) throws IOException { long tmpValue = value; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java index ac9fc45..3bd1b66 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -488,4 +490,119 @@ public class TestCellUtil { bbCell = new ByteBufferKeyValue(buffer, 0, buffer.remaining()); assertEquals(bd, CellUtil.getValueAsBigDecimal(bbCell)); } + + @Test + public void testWriteCell() throws IOException { + byte[] r = Bytes.toBytes("row1"); + byte[] f = Bytes.toBytes("cf1"); + byte[] q1 = Bytes.toBytes("qual1"); + byte[] q2 = Bytes.toBytes("qual2"); + byte[] v = Bytes.toBytes("val1"); + byte[] tags = Bytes.toBytes("tag1"); + KeyValue kv = new KeyValue(r, f, q1, 0, q1.length, 1234L, Type.Put, v, 0, v.length, tags); + NonExtendedCell nonExtCell = new NonExtendedCell(kv); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + int writeCell = CellUtil.writeCell(nonExtCell, os, true); + byte[] byteArray = os.toByteArray(); + KeyValue res = new KeyValue(byteArray); + assertTrue(CellUtil.equals(kv, res)); + } + + private static class NonExtendedCell implements Cell { + private KeyValue kv; + + public NonExtendedCell(KeyValue kv) { + this.kv = kv; + } + + @Override + public byte[] getRowArray() { + return this.kv.getRowArray(); + } + + @Override + public int getRowOffset() { + return this.kv.getRowOffset(); + } + + @Override + public short getRowLength() { + return this.kv.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return this.kv.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return this.kv.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return this.kv.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return this.kv.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return this.kv.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return this.kv.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return this.kv.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return this.kv.getTypeByte(); + } + + @Override + public long getSequenceId() { + return this.kv.getSequenceId(); + } + + @Override + public byte[] getValueArray() { + return this.kv.getValueArray(); + } + + @Override + public int getValueOffset() { + return this.kv.getValueOffset(); + } + + @Override + public int getValueLength() { + return this.kv.getValueLength(); + } + + @Override + public byte[] getTagsArray() { + return this.kv.getTagsArray(); + } + + @Override + public int getTagsOffset() { + return this.kv.getTagsOffset(); + } + + @Override + public int getTagsLength() { + return this.kv.getTagsLength(); + } + } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java new file mode 100644 index 0000000..6f4419e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +/** + * Use to specify the type of serialization for the mappers and reducers + */ +@InterfaceAudience.Public +public class CellSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return Cell.class.isAssignableFrom(c); + } + + @Override + public CellDeserializer getDeserializer(Class t) { + return new CellDeserializer(); + } + + @Override + public CellSerializer getSerializer(Class c) { + return new CellSerializer(); + } + + public static class CellDeserializer implements Deserializer { + private DataInputStream dis; + + @Override + public void close() throws IOException { + this.dis.close(); + } + + @Override + public KeyValue deserialize(Cell ignore) throws IOException { + // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO + return KeyValueUtil.create(this.dis); + } + + @Override + public void open(InputStream is) throws IOException { + this.dis = new DataInputStream(is); + } + } + + public static class CellSerializer implements Serializer { + private DataOutputStream dos; + + @Override + public void close() throws IOException { + this.dos.close(); + } + + @Override + public void open(OutputStream os) throws IOException { + this.dos = new DataOutputStream(os); + } + + @Override + public void serialize(Cell kv) throws IOException { + dos.writeInt(CellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT); + CellUtil.writeCell(kv, dos, true); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java new file mode 100644 index 0000000..c33ee15 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.MapReduceCell; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Emits sorted Cells. + * Reads in all Cells from passed Iterator, sorts them, then emits + * Cells in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat2 + */ +@InterfaceAudience.Public +public class CellSortReducer + extends Reducer { + protected void reduce(ImmutableBytesWritable row, Iterable kvs, + Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); + for (Cell kv : kvs) { + try { + map.add(CellUtil.deepClone(kv)); + } catch (CloneNotSupportedException e) { + throw new IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (Cell kv: map) { + context.write(row, new MapReduceCell(kv)); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 679d991..81af165 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -139,7 +139,7 @@ public class CopyTable extends Configured implements Tool { job.setNumReduceTasks(0); if (bulkload) { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, + TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null, job); // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index e757742..20b2d42 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -90,7 +91,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** @@ -233,14 +233,13 @@ public class HFileOutputFormat2 private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + private final long now = EnvironmentEdgeManager.currentTime(); private boolean rollRequested = false; @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - + Cell kv = cell; // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); @@ -248,7 +247,7 @@ public class HFileOutputFormat2 } byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); + int length = (CellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; byte[] family = CellUtil.cloneFamily(kv); byte[] tableNameBytes = null; if (writeMultipleTables) { @@ -337,7 +336,8 @@ public class HFileOutputFormat2 } // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); + // TODO : Currently in SettableTimeStamp but this will also move to ExtendedCell + CellUtil.updateLatestStamp(cell, this.now); wl.writer.append(kv); wl.written += length; @@ -578,10 +578,11 @@ public class HFileOutputFormat2 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } - static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { + static void configureIncrementalLoad(Job job, List multiTableInfo, + Class> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceCell.class); job.setOutputFormatClass(cls); if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { @@ -595,8 +596,9 @@ public class HFileOutputFormat2 // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); + if (KeyValue.class.equals(job.getMapOutputValueClass()) + || MapReduceCell.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(CellSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { @@ -607,7 +609,7 @@ public class HFileOutputFormat2 conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); @@ -655,7 +657,7 @@ public class HFileOutputFormat2 Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceCell.class); job.setOutputFormatClass(HFileOutputFormat2.class); ArrayList singleTableDescriptor = new ArrayList<>(1); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 28962bb..7af7738 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.RawComparator; @@ -96,12 +97,33 @@ public class Import extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public static class CellWritableComparablePartitioner + extends Partitioner { + private static CellWritableComparable[] START_KEYS = null; + @Override + public int getPartition(CellWritableComparable key, Cell value, + int numPartitions) { + for (int i = 0; i < START_KEYS.length; ++i) { + if (key.compareTo(START_KEYS[i]) <= 0) { + return i; + } + } + return START_KEYS.length; + } + + } + + /** + * @deprecated Use {@link CellWritableComparablePartitioner}. Will be removed + * from 3.0 onwards + */ + @Deprecated public static class KeyValueWritableComparablePartitioner extends Partitioner { private static KeyValueWritableComparable[] START_KEYS = null; + @Override - public int getPartition(KeyValueWritableComparable key, KeyValue value, - int numPartitions) { + public int getPartition(KeyValueWritableComparable key, KeyValue value, int numPartitions) { for (int i = 0; i < START_KEYS.length; ++i) { if (key.compareTo(START_KEYS[i]) <= 0) { return i; @@ -109,7 +131,6 @@ public class Import extends Configured implements Tool { } return START_KEYS.length; } - } public static class KeyValueWritableComparable @@ -119,8 +140,7 @@ public class Import extends Configured implements Tool { static { // register this comparator - WritableComparator.define(KeyValueWritableComparable.class, - new KeyValueWritableComparator()); + WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator()); } public KeyValueWritableComparable() { @@ -141,10 +161,10 @@ public class Import extends Configured implements Tool { } @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="This is wrong, yes, but we should be purging Writables, not fixing them") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", + justification = "This is wrong, yes, but we should be purging Writables, not fixing them") public int compareTo(KeyValueWritableComparable o) { - return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); + return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable) o).kv); } public static class KeyValueWritableComparator extends WritableComparator { @@ -166,18 +186,93 @@ public class Import extends Configured implements Tool { } + public static class CellWritableComparable + implements WritableComparable { + + private Cell kv = null; + + static { + // register this comparator + WritableComparator.define(CellWritableComparable.class, + new CellWritableComparator()); + } + + public CellWritableComparable() { + } + + public CellWritableComparable(Cell kv) { + this.kv = kv; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(CellUtil.estimatedSerializedSizeOfKey(kv)); + out.writeInt(0); + CellUtil.writeFlatKey(kv, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + kv = KeyValue.create(in); + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", + justification="This is wrong, yes, but we should be purging Writables, not fixing them") + public int compareTo(CellWritableComparable o) { + return CellComparator.COMPARATOR.compare(this.kv, ((CellWritableComparable)o).kv); + } + + public static class CellWritableComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + CellWritableComparable kv1 = new CellWritableComparable(); + kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + CellWritableComparable kv2 = new CellWritableComparable(); + kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + return compare(kv1, kv2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + } + + /** + * @deprecated Use {@link CellReducer}. Will be removed from 3.0 onwards + */ + @Deprecated public static class KeyValueReducer - extends - Reducer { - protected void reduce( - KeyValueWritableComparable row, - Iterable kvs, - Reducer.Context context) + extends Reducer { + protected void reduce(KeyValueWritableComparable row, Iterable kvs, + Reducer.Context context) throws java.io.IOException, InterruptedException { int index = 0; for (KeyValue kv : kvs) { context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " + + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); + } + } + } + + public static class CellReducer + extends + Reducer { + protected void reduce( + CellWritableComparable row, + Iterable kvs, + Reducer.Context context) + throws java.io.IOException, InterruptedException { + int index = 0; + for (Cell kv : kvs) { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), + new MapReduceCell(kv)); if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); @@ -185,13 +280,123 @@ public class Import extends Configured implements Tool { } } + /** + * @deprecated Use {@link CellSortImporter}. Will be removed from 3.0 onwards + */ + @Deprecated public static class KeyValueSortImporter extends TableMapper { private Map cfRenameMap; private Filter filter; + private static final Log LOG = LogFactory.getLog(KeyValueSortImporter.class); + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null || !filter.filterRowKey( + CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); + context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + int reduceNum = context.getNumReduceTasks(); + Configuration conf = context.getConfiguration(); + TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); + try (Connection conn = ConnectionFactory.createConnection(conf); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + byte[][] startKeys = regionLocator.getStartKeys(); + if (startKeys.length != reduceNum) { + throw new IOException("Region split after job initialization"); + } + KeyValueWritableComparable[] startKeyWraps = + new KeyValueWritableComparable[startKeys.length - 1]; + for (int i = 1; i < startKeys.length; ++i) { + startKeyWraps[i - 1] = + new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); + } + KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + } + } + } + + /** + * A mapper that just writes out KeyValues. + * @deprecated Use {@link CellImporter}. Will be removed from 3.0 onwards + */ + @Deprecated + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", + justification = "Writables are going away and this has been this way forever") + public static class KeyValueImporter extends TableMapper { + private Map cfRenameMap; + private Filter filter; private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null || !filter.filterRowKey( + CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + } + } + + public static class CellSortImporter + extends TableMapper { + private Map cfRenameMap; + private Filter filter; + private static final Log LOG = LogFactory.getLog(CellImporter.class); + + /** * @param row The current table row key. * @param value The columns. * @param context The current context. @@ -213,9 +418,8 @@ public class Import extends Configured implements Tool { kv = filterKv(filter, kv); // skip if we filtered it out if (kv == null) continue; - // TODO get rid of ensureKeyValue - KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); - context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + Cell ret = convertKv(kv, cfRenameMap); + context.write(new CellWritableComparable(ret), ret); } } } catch (InterruptedException e) { @@ -236,13 +440,13 @@ public class Import extends Configured implements Tool { if (startKeys.length != reduceNum) { throw new IOException("Region split after job initialization"); } - KeyValueWritableComparable[] startKeyWraps = - new KeyValueWritableComparable[startKeys.length - 1]; + CellWritableComparable[] startKeyWraps = + new CellWritableComparable[startKeys.length - 1]; for (int i = 1; i < startKeys.length; ++i) { startKeyWraps[i - 1] = - new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); + new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); } - KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + CellWritableComparablePartitioner.START_KEYS = startKeyWraps; } } } @@ -252,10 +456,10 @@ public class Import extends Configured implements Tool { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", justification="Writables are going away and this has been this way forever") - public static class KeyValueImporter extends TableMapper { + public static class CellImporter extends TableMapper { private Map cfRenameMap; private Filter filter; - private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + private static final Log LOG = LogFactory.getLog(CellImporter.class); /** * @param row The current table row key. @@ -279,8 +483,7 @@ public class Import extends Configured implements Tool { kv = filterKv(filter, kv); // skip if we filtered it out if (kv == null) continue; - // TODO get rid of ensureKeyValue - context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); + context.write(row, new MapReduceCell(convertKv(kv, cfRenameMap))); } } } catch (InterruptedException e) { @@ -505,21 +708,21 @@ public class Import extends Configured implements Tool { if(cfRenameMap != null) { // If there's a rename mapping for this CF, create a new KeyValue byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); - if(newCfName != null) { - kv = new KeyValue(kv.getRowArray(), // row buffer - kv.getRowOffset(), // row offset - kv.getRowLength(), // row length - newCfName, // CF buffer - 0, // CF offset - newCfName.length, // CF length - kv.getQualifierArray(), // qualifier buffer - kv.getQualifierOffset(), // qualifier offset - kv.getQualifierLength(), // qualifier length - kv.getTimestamp(), // timestamp - KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type - kv.getValueArray(), // value buffer - kv.getValueOffset(), // value offset - kv.getValueLength()); // value length + if (newCfName != null) { + kv = new KeyValue(kv.getRowArray(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getQualifierArray(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type + kv.getValueArray(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength()); // value length } } return kv; @@ -626,35 +829,35 @@ public class Import extends Configured implements Tool { Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - job.setMapperClass(KeyValueSortImporter.class); - job.setReducerClass(KeyValueReducer.class); + job.setMapperClass(CellSortImporter.class); + job.setReducerClass(CellReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(KeyValueWritableComparable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputKeyClass(CellWritableComparable.class); + job.setMapOutputValueClass(MapReduceCell.class); job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", - KeyValueWritableComparable.KeyValueWritableComparator.class, + CellWritableComparable.CellWritableComparator.class, RawComparator.class); Path partitionsPath = new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); FileSystem fs = FileSystem.get(job.getConfiguration()); fs.deleteOnExit(partitionsPath); - job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); + job.setPartitionerClass(CellWritableComparablePartitioner.class); job.setNumReduceTasks(regionLocator.getStartKeys().length); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); } } else if (hfileOutPath != null) { LOG.info("writing to hfiles for bulk load."); - job.setMapperClass(KeyValueImporter.class); + job.setMapperClass(CellImporter.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)){ - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 4a1dea8..d672803 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -584,7 +584,7 @@ public class ImportTsv extends Configured implements Tool { job.getConfiguration().setStrings("io.serializations", job.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java index d0cc00e..3207712 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java @@ -29,7 +29,13 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; - +/** + * Use to specify the type of serialization for the mappers + * and reducers + * @deprecated Use {@link CellSerialization}. Will be + * removed from 3.0 onwards + */ +@Deprecated @InterfaceAudience.Public public class KeyValueSerialization implements Serialization { @Override diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java index 824f23d..3ba5198 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.Reducer; * KeyValues in sorted order. If lots of columns per row, it will use lots of * memory sorting. * @see HFileOutputFormat2 + * @deprecated Use {@link CellSortReducer}. Will be removed from + * 3.0 onwards */ +@Deprecated @InterfaceAudience.Public public class KeyValueSortReducer extends Reducer { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index 49902f4..bb935c3 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -48,7 +48,7 @@ import org.apache.hadoop.util.StringUtils; * Puts in sorted order. If lots of columns per row, it will use lots of * memory sorting. * @see HFileOutputFormat2 - * @see KeyValueSortReducer + * @see CellSortReducer */ @InterfaceAudience.Public public class PutSortReducer extends diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 4dcd048..0e813a9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -207,7 +207,7 @@ public class TableMapReduceUtil { conf.set(TableInputFormat.SCAN, convertScanToString(scan)); conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); if (addDependencyJars) { addDependencyJars(job); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 14da314..2aaa4eb 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -45,7 +45,7 @@ import org.apache.hadoop.util.StringUtils; /** * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. * @see HFileOutputFormat2 - * @see KeyValueSortReducer + * @see CellSortReducer * @see PutSortReducer */ @InterfaceAudience.Public diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index e1d8d28..02c4640 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -44,8 +43,9 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MapReduceCell; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; /** * A tool to replay WAL files as a M/R job. @@ -95,7 +96,9 @@ public class WALPlayer extends Configured implements Tool { /** * A mapper that just writes out KeyValues. * This one can be used together with {@link KeyValueSortReducer} + * @deprecated Use {@link WALCellMapper}. Will be removed from 3.0 onwards */ + @Deprecated static class WALKeyValueMapper extends Mapper { private byte[] table; @@ -133,6 +136,47 @@ public class WALPlayer extends Configured implements Tool { } } + /** + * A mapper that just writes out Cells. + * This one can be used together with {@link CellSortReducer} + */ + static class WALCellMapper + extends Mapper { + private byte[] table; + + @Override + public void map(WALKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename().getName())) { + for (Cell cell : value.getCells()) { + if (WALEdit.isMetaEditFamily(cell)) { + continue; + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), + new MapReduceCell(cell)); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + // only a single table is supported when HFiles are generated with HFileOutputFormat + String[] tables = context.getConfiguration().getStrings(TABLES_KEY); + if (tables == null || tables.length != 1) { + // this can only happen when WALMapper is used directly by a class other than WALPlayer + throw new IOException("Exactly one table must be specified for bulk HFile case."); + } + table = Bytes.toBytes(tables[0]); + + } + + } /** * A mapper that writes out {@link Mutation} to be directly applied to @@ -299,11 +343,11 @@ public class WALPlayer extends Configured implements Tool { throw new IOException("Exactly one table must be specified for the bulk export option"); } TableName tableName = TableName.valueOf(tables[0]); - job.setMapperClass(WALKeyValueMapper.class); - job.setReducerClass(KeyValueSortReducer.class); + job.setMapperClass(WALCellMapper.class); + job.setReducerClass(CellSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(MapReduceCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java new file mode 100644 index 0000000..c0f74a5 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferCell; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A wrapper for a cell to be used with mapreduce, as the output value class for mappers/reducers. + */ +@InterfaceAudience.Private +public class MapReduceCell extends ByteBufferCell implements ExtendedCell { + + private final Cell cell; + + public MapReduceCell(Cell cell) { + this.cell = cell; + } + + @Override + public byte[] getRowArray() { + return this.cell.getRowArray(); + } + + @Override + public int getRowOffset() { + return this.cell.getRowOffset(); + } + + @Override + public short getRowLength() { + return this.cell.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return this.cell.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return this.cell.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return this.cell.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return this.cell.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return this.cell.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return this.cell.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return this.cell.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return this.cell.getTypeByte(); + } + + @Override + public long getSequenceId() { + return this.cell.getSequenceId(); + } + + @Override + public byte[] getValueArray() { + return this.cell.getValueArray(); + } + + @Override + public int getValueOffset() { + return this.cell.getValueOffset(); + } + + @Override + public int getValueLength() { + return this.cell.getValueLength(); + } + + @Override + public byte[] getTagsArray() { + return this.cell.getTagsArray(); + } + + @Override + public int getTagsOffset() { + return this.cell.getTagsOffset(); + } + + @Override + public int getTagsLength() { + return this.cell.getTagsLength(); + } + + @Override + public ByteBuffer getRowByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getRowByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneRow(this.cell)); + } + } + + @Override + public int getRowPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getRowPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getFamilyByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneFamily(this.cell)); + } + } + + @Override + public int getFamilyPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getFamilyPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getQualifierByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneQualifier(this.cell)); + } + } + + @Override + public int getQualifierPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getQualifierPosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getValueByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getValueByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneValue(this.cell)); + } + } + + @Override + public int getValuePosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getValuePosition(); + } else { + return 0; + } + } + + @Override + public ByteBuffer getTagsByteBuffer() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getTagsByteBuffer(); + } else { + return ByteBuffer.wrap(CellUtil.cloneTags(this.cell)); + } + } + + @Override + public int getTagsPosition() { + if (cell instanceof ByteBufferCell) { + return ((ByteBufferCell) this.cell).getTagsPosition(); + } else { + return 0; + } + } + + @Override + public String toString() { + return this.cell.toString(); + } + + @Override + public void setSequenceId(long seqId) throws IOException { + CellUtil.setSequenceId(cell, seqId); + } + + @Override + public void setTimestamp(long ts) throws IOException { + CellUtil.setTimestamp(cell, ts); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + CellUtil.setTimestamp(cell, ts, tsOffset); + } + + @Override + public long heapSize() { + return CellUtil.estimatedHeapSizeOf(cell); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + return CellUtil.writeCell(cell, out, withTags); + } + + @Override + public int getSerializedSize(boolean withTags) { + return CellUtil.estimatedSerializedSizeOf(cell) - Bytes.SIZEOF_INT; + } + + @Override + public void write(ByteBuffer buf, int offset) { + CellUtil.writeCellToBuffer(cell, buf, offset); + } + + @Override + public ExtendedCell deepClone() { + try { + return (ExtendedCell) CellUtil.deepClone(cell); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 0b5a929..2e523af 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -440,12 +440,12 @@ public class TestHFileOutputFormat2 { // Set start and end rows for partitioner. SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); - job.setReducerClass(KeyValueSortReducer.class); + job.setReducerClass(CellSortReducer.class); job.setOutputFormatClass(HFileOutputFormat2.class); job.setNumReduceTasks(4); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); FileOutputFormat.setOutputPath(job, testDir); assertTrue(job.waitForCompletion(false)); @@ -764,7 +764,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); + CellSerialization.class.getName()); setupRandomGeneratorMapper(job, putSortReducer); if (tableInfo.size() > 1) { MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index b581e04..e8edf98 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; +import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL; @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ToolRunner; @@ -664,7 +665,7 @@ public class TestImportExport { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testKeyValueImporter() throws Throwable { - KeyValueImporter importer = new KeyValueImporter(); + CellImporter importer = new CellImporter(); Configuration configuration = new Configuration(); Context ctx = mock(Context.class); when(ctx.getConfiguration()).thenReturn(configuration); @@ -674,12 +675,12 @@ public class TestImportExport { @Override public Void answer(InvocationOnMock invocation) throws Throwable { ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; - KeyValue key = (KeyValue) invocation.getArguments()[1]; + MapReduceCell key = (MapReduceCell) invocation.getArguments()[1]; assertEquals("Key", Bytes.toString(writer.get())); assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); return null; } - }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + }).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceCell.class)); importer.setup(ctx); Result value = mock(Result.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index eef9cff..d950bcf 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -45,13 +45,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; +import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALCellMapper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.hbase.util.MapReduceCell; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Mapper; @@ -164,11 +165,11 @@ public class TestWALPlayer { private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { Configuration configuration = new Configuration(); configuration.set(tableConfigKey, "table"); - WALKeyValueMapper mapper = new WALKeyValueMapper(); + WALCellMapper mapper = new WALCellMapper(); WALKey key = mock(WALKey.class); when(key.getTablename()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper.Context context = mock(Context.class); + Mapper.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class); @@ -184,12 +185,12 @@ public class TestWALPlayer { @Override public Void answer(InvocationOnMock invocation) throws Throwable { ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; - KeyValue key = (KeyValue) invocation.getArguments()[1]; + MapReduceCell key = (MapReduceCell) invocation.getArguments()[1]; assertEquals("row", Bytes.toString(writer.get())); assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); return null; } - }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + }).when(context).write(any(ImmutableBytesWritable.class), any(MapReduceCell.class)); mapper.map(key, value, context);