.../mapreduce/MapReduceHFileSplitterJob.java | 17 ++----- .../java/org/apache/hadoop/hbase/CellUtil.java | 7 +-- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 3 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 3 +- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 23 +++++++++ .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 20 +++++--- .../org/apache/hadoop/hbase/mapreduce/Import.java | 54 ++++++++++++++++------ .../hbase/mapreduce/KeyValueSerialization.java | 21 +++++---- .../hbase/mapreduce/KeyValueSortReducer.java | 25 +++++----- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 8 ++-- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 2 +- 11 files changed, 118 insertions(+), 65 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 49e8c75..03287e8 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -24,10 +24,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; @@ -73,20 +74,12 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { * {@link KeyValueSortReducer} */ static class HFileCellMapper extends - Mapper { + Mapper { @Override - public void map(NullWritable key, KeyValue value, Context context) throws IOException, + public void map(NullWritable key, Cell value, Context context) throws IOException, InterruptedException { // Convert value to KeyValue if subclass - if (!value.getClass().equals(KeyValue.class)) { - value = - new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), - value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), - value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), - value.getValueOffset(), value.getValueLength()); - } context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); } @@ -122,7 +115,7 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { job.setReducerClass(KeyValueSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(ExtendedCell.class); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 4a5023d..42dd5cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -1735,7 +1736,7 @@ public final class CellUtil { * @param out * @throws IOException */ - public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException { + public static void writeFlatKey(Cell cell, DataOutput out) throws IOException { short rowLen = cell.getRowLength(); byte fLen = cell.getFamilyLength(); int qLen = cell.getQualifierLength(); @@ -1815,7 +1816,7 @@ public final class CellUtil { public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix); } else { out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix); @@ -1865,7 +1866,7 @@ public final class CellUtil { public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell, int qlength, int commonPrefix) throws IOException { if (cell instanceof ByteBufferCell) { - ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(), + ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(), ((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix); } else { out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix, diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index 5d7a379..cf1a1e7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -262,7 +263,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, kLength); ByteBufferUtils.putCompressedInt(out, vLength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); // Write the value part CellUtil.writeValue(out, cell, cell.getValueLength()); } else { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 842894f..0e3a5e6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -59,7 +60,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, klength); ByteBufferUtils.putCompressedInt(out, vlength); ByteBufferUtils.putCompressedInt(out, 0); - CellUtil.writeFlatKey(cell, out); + CellUtil.writeFlatKey(cell, (DataOutput)out); } else { // find a common prefix and skip it int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index d7f2035..04e3e16 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -193,6 +194,28 @@ public final class ByteBufferUtils { } } + /** + * Copy data from a buffer to an output stream. Does not update the position + * in the buffer. + * @param out the output stream to write bytes to + * @param in the buffer to read bytes from + * @param offset the offset in the buffer (from the buffer's array offset) + * to start copying bytes from + * @param length the number of bytes to copy + */ + public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length) + throws IOException { + if (out instanceof ByteBufferWriter) { + ((ByteBufferWriter) out).write(in, offset, length); + } else if (in.hasArray()) { + out.write(in.array(), in.arrayOffset() + offset, length); + } else { + for (int i = 0; i < length; ++i) { + out.write(toByte(in, offset + i)); + } + } + } + public static int putLong(OutputStream out, final long value, final int fitInBytes) throws IOException { long tmpValue = value; diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 7fea254..3e920c8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -66,7 +67,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -229,13 +229,13 @@ public class HFileOutputFormat2 private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + private final long now = EnvironmentEdgeManager.currentTime(); private boolean rollRequested = false; @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + Cell kv = cell; // null input == user explicitly wants to flush if (row == null && kv == null) { @@ -244,7 +244,14 @@ public class HFileOutputFormat2 } byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); + int length = 0; + if (kv instanceof ExtendedCell) { + length = ((ExtendedCell) kv).getSerializedSize(true); + } else { + // Is this else really needed? + length = CellUtil.estimatedSerializedSizeOfKey(cell) + cell.getValueLength() + + KeyValue.ROW_OFFSET; + } byte[] family = CellUtil.cloneFamily(kv); byte[] tableNameBytes = null; if (writeMultipleTables) { @@ -333,7 +340,8 @@ public class HFileOutputFormat2 } // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); + // TODO : Currently in SettableTimeStamp but this will also move to ExtendedCell + CellUtil.updateLatestStamp(cell, this.now); wl.writer.append(kv); wl.written += length; @@ -577,7 +585,7 @@ public class HFileOutputFormat2 static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(ExtendedCell.class); job.setOutputFormatClass(cls); if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 18dcf35..0d61c9a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -97,10 +98,10 @@ public class Import extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; public static class KeyValueWritableComparablePartitioner - extends Partitioner { + extends Partitioner { private static KeyValueWritableComparable[] START_KEYS = null; @Override - public int getPartition(KeyValueWritableComparable key, KeyValue value, + public int getPartition(KeyValueWritableComparable key, Cell value, int numPartitions) { for (int i = 0; i < START_KEYS.length; ++i) { if (key.compareTo(START_KEYS[i]) <= 0) { @@ -115,7 +116,7 @@ public class Import extends Configured implements Tool { public static class KeyValueWritableComparable implements WritableComparable { - private KeyValue kv = null; + private Cell kv = null; static { // register this comparator @@ -126,13 +127,16 @@ public class Import extends Configured implements Tool { public KeyValueWritableComparable() { } - public KeyValueWritableComparable(KeyValue kv) { + public KeyValueWritableComparable(Cell kv) { this.kv = kv; } @Override public void write(DataOutput out) throws IOException { - KeyValue.write(kv, out); + assert kv instanceof ExtendedCell; + out.writeInt(CellUtil.estimatedSerializedSizeOfKey(kv)); + out.writeInt(0); + CellUtil.writeFlatKey(kv, out); } @Override @@ -168,15 +172,15 @@ public class Import extends Configured implements Tool { public static class KeyValueReducer extends - Reducer { + Reducer { protected void reduce( KeyValueWritableComparable row, - Iterable kvs, + Iterable kvs, Reducer.Context context) + Cell, ImmutableBytesWritable, Cell>.Context context) throws java.io.IOException, InterruptedException { int index = 0; - for (KeyValue kv : kvs) { + for (Cell kv : kvs) { context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " @@ -186,7 +190,7 @@ public class Import extends Configured implements Tool { } public static class KeyValueSortImporter - extends TableMapper { + extends TableMapper { private Map cfRenameMap; private Filter filter; private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); @@ -213,9 +217,8 @@ public class Import extends Configured implements Tool { kv = filterKv(filter, kv); // skip if we filtered it out if (kv == null) continue; - // TODO get rid of ensureKeyValue - KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); - context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + Cell ret = convertKv(kv, cfRenameMap); + context.write(new KeyValueWritableComparable(ret), ret); } } } catch (InterruptedException e) { @@ -506,6 +509,8 @@ public class Import extends Configured implements Tool { // If there's a rename mapping for this CF, create a new KeyValue byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); if(newCfName != null) { + // TODO : Handle Bytebuffer backed KV + if (kv.getTagsLength() == 0) { kv = new KeyValue(kv.getRowArray(), // row buffer kv.getRowOffset(), // row offset kv.getRowLength(), // row length @@ -520,6 +525,25 @@ public class Import extends Configured implements Tool { kv.getValueArray(), // value buffer kv.getValueOffset(), // value offset kv.getValueLength()); // value length + } else { + kv = new KeyValue(kv.getRowArray(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getQualifierArray(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type + kv.getValueArray(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength(), // value length + kv.getTagsArray(), // tag buffer + kv.getTagsOffset(), // tag offset + kv.getTagsLength()); // tag length + } } } return kv; @@ -631,7 +655,7 @@ public class Import extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(KeyValueWritableComparable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(ExtendedCell.class); job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", KeyValueWritableComparable.KeyValueWritableComparator.class, RawComparator.class); @@ -654,7 +678,7 @@ public class Import extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(ExtendedCell.class); HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java index 241608b..9d3cb20 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -31,23 +33,23 @@ import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; @InterfaceAudience.Public -public class KeyValueSerialization implements Serialization { +public class KeyValueSerialization implements Serialization { @Override public boolean accept(Class c) { - return KeyValue.class.isAssignableFrom(c); + return Cell.class.isAssignableFrom(c); } @Override - public KeyValueDeserializer getDeserializer(Class t) { + public KeyValueDeserializer getDeserializer(Class t) { return new KeyValueDeserializer(); } @Override - public KeyValueSerializer getSerializer(Class c) { + public KeyValueSerializer getSerializer(Class c) { return new KeyValueSerializer(); } - public static class KeyValueDeserializer implements Deserializer { + public static class KeyValueDeserializer implements Deserializer { private DataInputStream dis; @Override @@ -56,7 +58,7 @@ public class KeyValueSerialization implements Serialization { } @Override - public KeyValue deserialize(KeyValue ignore) throws IOException { + public KeyValue deserialize(Cell ignore) throws IOException { // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO return KeyValueUtil.create(this.dis); } @@ -67,7 +69,7 @@ public class KeyValueSerialization implements Serialization { } } - public static class KeyValueSerializer implements Serializer { + public static class KeyValueSerializer implements Serializer { private DataOutputStream dos; @Override @@ -81,8 +83,9 @@ public class KeyValueSerialization implements Serialization { } @Override - public void serialize(KeyValue kv) throws IOException { - KeyValueUtil.write(kv, this.dos); + public void serialize(Cell kv) throws IOException { + assert kv instanceof ExtendedCell; + ((ExtendedCell) kv).write(this.dos, true); } } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java index 997e5a8..fdf32b1 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java @@ -18,11 +18,13 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; import java.util.TreeSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.Reducer; @@ -35,21 +37,20 @@ import org.apache.hadoop.mapreduce.Reducer; */ @InterfaceAudience.Public public class KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable row, Iterable kvs, - Reducer.Context context) + extends Reducer { + protected void reduce(ImmutableBytesWritable row, Iterable kvs, + Reducer.Context context) throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); - for (KeyValue kv: kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); + TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); + for (Cell kv : kvs) { + if (!(kv instanceof ExtendedCell)) { + throw new IOException(new CloneNotSupportedException()); } + map.add(((ExtendedCell) kv).deepClone()); } context.setStatus("Read " + map.getClass()); int index = 0; - for (KeyValue kv: map) { + for (Cell kv: map) { context.write(row, kv); if (++index % 100 == 0) context.setStatus("Wrote " + index); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index b1e655c..95a6204 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; @@ -97,7 +96,7 @@ public class WALPlayer extends Configured implements Tool { * This one can be used together with {@link KeyValueSortReducer} */ static class WALKeyValueMapper - extends Mapper { + extends Mapper { private byte[] table; @Override @@ -108,11 +107,10 @@ public class WALPlayer extends Configured implements Tool { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (WALEdit.isMetaEditFamily(kv)) { + if (WALEdit.isMetaEditFamily(cell)) { continue; } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), cell); } } } catch (InterruptedException e) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 427c5cc..24e3f51 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -168,7 +168,7 @@ public class TestWALPlayer { WALKey key = mock(WALKey.class); when(key.getTablename()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper.Context context = mock(Context.class); + Mapper.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class);