diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 79eae20..f7562af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -106,6 +106,23 @@ public class Delete extends Mutation implements Comparable { * @param rowArray We make a local copy of this passed in row. * @param rowOffset * @param rowLength + */ + public Delete(final byte [] rowArray, final int rowOffset, final int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * Create a Delete operation for the specified row and timestamp.

+ * + * If no further operations are done, this will delete all columns in all + * families of the specified row with a timestamp less than or equal to the + * specified timestamp.

+ * + * This timestamp is ONLY used for a delete row operation. If specifying + * families or columns, you must specify each timestamp individually. + * @param rowArray We make a local copy of this passed in row. + * @param rowOffset + * @param rowLength * @param ts maximum version timestamp (only for delete row) */ public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index be028c6..1e388a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -70,6 +70,17 @@ public class Put extends Mutation implements HeapSize, Comparable { * @param rowLength * @param ts */ + public Put(byte [] rowArray, int rowOffset, int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * We make a copy of the passed in row key to keep local. + * @param rowArray + * @param rowOffset + * @param rowLength + * @param ts + */ public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) { checkRow(rowArray, rowOffset, rowLength); this.row = Bytes.copy(rowArray, rowOffset, rowLength); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 13b55a1..a08f375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -33,10 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -47,49 +46,61 @@ import com.google.common.base.Preconditions; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; -import com.google.protobuf.TextFormat; /** * Utility to help ipc'ing. */ class IPCUtil { public static final Log LOG = LogFactory.getLog(IPCUtil.class); - private final int cellBlockBuildingInitialBufferSize; /** * How much we think the decompressor will expand the original compressed content. */ private final int cellBlockDecompressionMultiplier; + private final int cellBlockBuildingInitialBufferSize; private final Configuration conf; IPCUtil(final Configuration conf) { super(); this.conf = conf; - this.cellBlockBuildingInitialBufferSize = - conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024); this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); + // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in + // #buildCellBlock. + this.cellBlockBuildingInitialBufferSize = + ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); } /** - * Build a cell block using passed in codec + * Puts CellScanner Cells into a cell block using passed in codec and/or + * compressor. * @param codec * @param compressor - * @Param cells - * @return Null or byte buffer filled with passed-in Cells encoded using passed in - * codec; the returned buffer has been flipped and is ready for - * reading. Use limit to find total size. + * @Param cellScanner + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. * @throws IOException */ @SuppressWarnings("resource") ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cells) + final CellScanner cellScanner) throws IOException { - if (cells == null) return null; - // TOOD: Reuse buffers? - // Presizing doesn't work because can't tell what size will be when serialized. - // BBOS will resize itself. - ByteBufferOutputStream baos = - new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize); + if (cellScanner == null) return null; + int bufferSize = this.cellBlockBuildingInitialBufferSize; + if (cellScanner instanceof HeapSize) { + long longSize = ((HeapSize)cellScanner).heapSize(); + // Just make sure we don't have a size bigger than an int. + if (longSize > Integer.MAX_VALUE) { + throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + } + bufferSize = ClassSize.align((int)longSize); + } // TODO: Else, get estimate on size of buffer rather than have the buffer resize. + // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of + // total size before creating the buffer. It costs somw small percentage. If we are usually + // within the estimated buffer size, then the cost is not worth it. If we are often well + // outside the guesstimated buffer size, the processing can be done in half the time if we + // go w/ the estimated size rather than let the buffer resize. + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); OutputStream os = baos; Compressor poolCompressor = null; try { @@ -99,8 +110,8 @@ class IPCUtil { os = compressor.createOutputStream(os, poolCompressor); } Codec.Encoder encoder = codec.getEncoder(os); - while (cells.advance()) { - encoder.write(cells.current()); + while (cellScanner.advance()) { + encoder.write(cellScanner.current()); } encoder.flush(); } finally { @@ -108,9 +119,9 @@ class IPCUtil { if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); } if (LOG.isTraceEnabled()) { - if (this.cellBlockBuildingInitialBufferSize < baos.size()) { - LOG.trace("Buffer grew from " + this.cellBlockBuildingInitialBufferSize + - " to " + baos.size()); + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); } } return baos.getByteBuffer(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index bb25111..6b7d62a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -1277,4 +1277,4 @@ public final class RequestConverter { } return builder.build(); } -} +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 47bee96..1af3810 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -23,21 +23,28 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.commons.lang.time.StopWatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.log4j.Level; @Category(SmallTests.class) public class TestIPCUtil { @@ -49,33 +56,137 @@ public class TestIPCUtil { @Test public void testBuildCellBlock() throws IOException { - doBuildCellBlockUndoCellBlock(new KeyValueCodec(), null); - doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new DefaultCodec()); - doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new GzipCodec()); + doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); + doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec()); + doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec()); } - void doBuildCellBlockUndoCellBlock(final Codec codec, final CompressionCodec compressor) + static void doBuildCellBlockUndoCellBlock(final IPCUtil util, + final Codec codec, final CompressionCodec compressor) throws IOException { - final int count = 10; - Cell [] cells = getCells(count); - ByteBuffer bb = this.util.buildCellBlock(codec, compressor, - CellUtil.createCellScanner(Arrays.asList(cells).iterator())); - CellScanner scanner = - this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false); + } + + static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec, + final CompressionCodec compressor, final int count, final int size, final boolean sized) + throws IOException { + Cell [] cells = getCells(count, size); + CellScanner cellScanner = sized? getSizedCellScanner(cells): + CellUtil.createCellScanner(Arrays.asList(cells).iterator()); + ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); + cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); int i = 0; - while (scanner.advance()) { + while (cellScanner.advance()) { i++; } assertEquals(count, i); } + static CellScanner getSizedCellScanner(final Cell [] cells) { + int size = -1; + for (Cell cell: cells) { + size += CellUtil.estimatedSizeOf(cell); + } + final int totalSize = ClassSize.align(size); + final CellScanner cellScanner = CellUtil.createCellScanner(cells); + return new SizedCellScanner() { + @Override + public long heapSize() { + return totalSize; + } + + @Override + public Cell current() { + return cellScanner.current(); + } + + @Override + public boolean advance() throws IOException { + return cellScanner.advance(); + } + }; + } + static Cell [] getCells(final int howMany) { + return getCells(howMany, 1024); + } + + static Cell [] getCells(final int howMany, final int valueSize) { Cell [] cells = new Cell[howMany]; + byte [] value = new byte[valueSize]; for (int i = 0; i < howMany; i++) { byte [] index = Bytes.toBytes(i); - KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index); + KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value); cells[i] = kv; } return cells; } + + private static final String COUNT = "--count="; + private static final String SIZE = "--size="; + + /** + * Prints usage and then exits w/ passed errCode + * @param errCode + */ + private static void usage(final int errCode) { + System.out.println("Usage: IPCUtil [options]"); + System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing"); + System.out.println(" --count Count of Cells"); + System.out.println(" --size Size of Cell values"); + System.out.println("Example: IPCUtil --count=1024 --size=1024"); + System.exit(errCode); + } + + private static void timerTests(final IPCUtil util, final int count, final int size, + final Codec codec, final CompressionCodec compressor) + throws IOException { + final int cycles = 1000; + StopWatch timer = new StopWatch(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(util, timer, count, size, codec, compressor, false); + } + timer.stop(); + Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + + ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + timer.reset(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(util, timer, count, size, codec, compressor, true); + } + timer.stop(); + Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + + ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + } + + private static void timerTest(final IPCUtil util, final StopWatch timer, final int count, + final int size, final Codec codec, final CompressionCodec compressor, final boolean sized) + throws IOException { + doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized); + } + + /** + * For running a few tests of methods herein. + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + int count = 1024; + int size = 10240; + for (String arg: args) { + if (arg.startsWith(COUNT)) { + count = Integer.parseInt(arg.replace(COUNT, "")); + } else if (arg.startsWith(SIZE)) { + size = Integer.parseInt(arg.replace(SIZE, "")); + } else { + usage(1); + } + } + IPCUtil util = new IPCUtil(HBaseConfiguration.create()); + ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL); + timerTests(util, count, size, new KeyValueCodec(), null); + timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec()); + timerTests(util, count, size, new KeyValueCodec(), new GzipCodec()); + } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index c677a0d..230d48a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -28,6 +28,7 @@ import java.util.NavigableMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; /** * Utility methods helpful slinging {@link Cell} instances. @@ -242,4 +243,44 @@ public final class CellUtil { } }; } -} + + /** + * @param left + * @param right + * @return True if the rows in left and right Cells match + */ + public static boolean matchingRow(final Cell left, final Cell right) { + return Bytes.equals(left.getRowArray(), left.getRowOffset(), left.getRowLength(), + right.getRowArray(), right.getRowOffset(), right.getRowLength()); + } + + /** + * @return True if a delete type, a {@link KeyValue.Type#Delete} or + * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn} + * KeyValue type. + */ + public static boolean isDelete(final Cell cell) { + return KeyValue.isDelete(cell.getTypeByte()); + } + + /** + * @param cell + * @return Estimate of the cell size in bytes. + */ + public static int estimatedSizeOf(final Cell cell) { + // If a KeyValue, we can give a good estimate of size. + if (cell instanceof KeyValue) { + return ((KeyValue)cell).getLength() + Bytes.SIZEOF_INT; + } + // TODO: Should we add to Cell a sizeOf? Would it help? Does it make sense if Cell is + // prefix encoded or compressed? + return cell.getRowLength() + cell.getFamilyLength() + + cell.getQualifierLength() + + cell.getValueLength() + + // Use the KeyValue's infrastructure size presuming that another implementation would have + // same basic cost. + KeyValue.KEY_INFRASTRUCTURE_SIZE + + // Serialization is probably preceded by a length (it is in the KeyValueCodec at least). + Bytes.SIZEOF_INT; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..e32163f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java index 10ff57c..23b172d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java @@ -46,5 +46,4 @@ public interface HeapSize { * count of payload and hosting object sizings. */ public long heapSize(); - -} +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java new file mode 100644 index 0000000..0206e05 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.hadoop.hbase.CellScanner; + +/** + * A CellScanner that knows its size in memory in bytes. + * Used playing the CellScanner into an in-memory buffer; knowing the size ahead of time saves + * on background buffer resizings. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface SizedCellScanner extends CellScanner, HeapSize {} \ No newline at end of file diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index da5d121..a3bd954 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -9894,6 +9894,10 @@ public final class AdminProtos { java.util.List getKeyValueBytesList(); int getKeyValueBytesCount(); com.google.protobuf.ByteString getKeyValueBytes(int index); + + // optional int32 associatedCellCount = 3; + boolean hasAssociatedCellCount(); + int getAssociatedCellCount(); } public static final class WALEntry extends com.google.protobuf.GeneratedMessage @@ -9951,9 +9955,20 @@ public final class AdminProtos { return keyValueBytes_.get(index); } + // optional int32 associatedCellCount = 3; + public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3; + private int associatedCellCount_; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + private void initFields() { key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance(); keyValueBytes_ = java.util.Collections.emptyList();; + associatedCellCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -9981,6 +9996,9 @@ public final class AdminProtos { for (int i = 0; i < keyValueBytes_.size(); i++) { output.writeBytes(2, keyValueBytes_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(3, associatedCellCount_); + } getUnknownFields().writeTo(output); } @@ -10003,6 +10021,10 @@ public final class AdminProtos { size += dataSize; size += 1 * getKeyValueBytesList().size(); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, associatedCellCount_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10033,6 +10055,11 @@ public final class AdminProtos { } result = result && getKeyValueBytesList() .equals(other.getKeyValueBytesList()); + result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount()); + if (hasAssociatedCellCount()) { + result = result && (getAssociatedCellCount() + == other.getAssociatedCellCount()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10050,6 +10077,10 @@ public final class AdminProtos { hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER; hash = (53 * hash) + getKeyValueBytesList().hashCode(); } + if (hasAssociatedCellCount()) { + hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getAssociatedCellCount(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10175,6 +10206,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); keyValueBytes_ = java.util.Collections.emptyList();; bitField0_ = (bitField0_ & ~0x00000002); + associatedCellCount_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -10226,6 +10259,10 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); } result.keyValueBytes_ = keyValueBytes_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.associatedCellCount_ = associatedCellCount_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10255,6 +10292,9 @@ public final class AdminProtos { } onChanged(); } + if (other.hasAssociatedCellCount()) { + setAssociatedCellCount(other.getAssociatedCellCount()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10308,6 +10348,11 @@ public final class AdminProtos { keyValueBytes_.add(input.readBytes()); break; } + case 24: { + bitField0_ |= 0x00000004; + associatedCellCount_ = input.readInt32(); + break; + } } } } @@ -10455,6 +10500,27 @@ public final class AdminProtos { return this; } + // optional int32 associatedCellCount = 3; + private int associatedCellCount_ ; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + public Builder setAssociatedCellCount(int value) { + bitField0_ |= 0x00000004; + associatedCellCount_ = value; + onChanged(); + return this; + } + public Builder clearAssociatedCellCount() { + bitField0_ = (bitField0_ & ~0x00000004); + associatedCellCount_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALEntry) } @@ -15359,41 +15425,41 @@ public final class AdminProtos { "gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" + "ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" + "er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" + - "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", - "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" + - "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" + - "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" + - "ALWriterRequest\".\n\025RollWALWriterResponse" + - "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" + - "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + - "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" + - "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" + - "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" + - "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin", - "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" + - "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" + - "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" + - "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" + - "ineRegionRequest\032\030.GetOnlineRegionRespon" + - "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" + - "penRegionResponse\0228\n\013closeRegion\022\023.Close" + - "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" + - "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" + - "egionResponse\0228\n\013splitRegion\022\023.SplitRegi", - "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" + - "ctRegion\022\025.CompactRegionRequest\032\026.Compac" + - "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" + - "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" + - "replicateWALEntry\022\031.ReplicateWALEntryReq" + - "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" + - "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons", - "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponseBA\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" + - "\240\001\001" + "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", + "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" + + "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" + + "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" + + "cateWALEntryResponse\"\026\n\024RollWALWriterReq" + + "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" + + "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" + + "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" + + "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" + + "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" + + "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo", + "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" + + "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" + + "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" + + "etStoreFileRequest\032\025.GetStoreFileRespons" + + "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" + + "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" + + "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" + + "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" + + "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" + + "\023.FlushRegionRequest\032\024.FlushRegionRespon", + "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" + + ".SplitRegionResponse\022>\n\rcompactRegion\022\025." + + "CompactRegionRequest\032\026.CompactRegionResp" + + "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" + + "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" + + "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" + + "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" + + "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR", + "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes" + + "ponseBA\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15557,7 +15623,7 @@ public final class AdminProtos { internal_static_WALEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALEntry_descriptor, - new java.lang.String[] { "Key", "KeyValueBytes", }, + new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class); internal_static_ReplicateWALEntryRequest_descriptor = diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 733eadf..b3c23af 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -161,14 +161,18 @@ message MergeRegionsResponse { // Protocol buffer version of WAL for replication message WALEntry { required WALKey key = 1; + // Following may be null if the KVs/Cells are carried along the side in a cellblock (See + // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null + // and associatedCellCount has count of Cells associated w/ this WALEntry repeated bytes keyValueBytes = 2; + // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock. + optional int32 associatedCellCount = 3; } /** * Replicates the given entries. The guarantee is that the given entries * will be durable on the slave cluster if this method returns without - * any exception. - * hbase.replication has to be set to true for this to work. + * any exception. hbase.replication has to be set to true for this to work. */ message ReplicateWALEntryRequest { repeated WALEntry entry = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 29f0154..0bcd535 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,26 +20,32 @@ package org.apache.hadoop.hbase.protobuf; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.SizedCellScanner; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; public class ReplicationProtbufUtil { /** @@ -81,10 +87,11 @@ public class ReplicationProtbufUtil { */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, final HLog.Entry[] entries) throws IOException { - AdminProtos.ReplicateWALEntryRequest request = + Pair p = buildReplicateWALEntryRequest(entries); try { - admin.replicateWALEntry(null, request); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -94,10 +101,14 @@ public class ReplicationProtbufUtil { * Create a new ReplicateWALEntryRequest from a list of HLog entries * * @param entries the HLog entries to be replicated - * @return a ReplicateWALEntryRequest + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. */ - public static AdminProtos.ReplicateWALEntryRequest + public static Pair buildReplicateWALEntryRequest(final HLog.Entry[] entries) { + // Accumulate all the KVs seen in here. + List> allkvs = new ArrayList>(entries.length); + int size = 0; WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = @@ -128,13 +139,55 @@ public class ReplicationProtbufUtil { keyBuilder.addScopes(scopeBuilder.build()); } } - List keyValues = edit.getKeyValues(); - for (KeyValue value: keyValues) { - entryBuilder.addKeyValueBytes(ByteString.copyFrom( - value.getBuffer(), value.getOffset(), value.getLength())); + List kvs = edit.getKeyValues(); + // Add up the size. It is used later serializing out the kvs. + for (KeyValue kv: kvs) { + size += kv.getLength(); } + // Collect up the kvs + allkvs.add(kvs); + // Write out how many kvs associated with this entry. + entryBuilder.setAssociatedCellCount(kvs.size()); builder.addEntry(entryBuilder.build()); } - return builder.build(); + return new Pair(builder.build(), + getCellScanner(allkvs, size)); + } + + /** + * @param cells + * @return cells packaged as a CellScanner + */ + static CellScanner getCellScanner(final List> cells, final int size) { + return new SizedCellScanner() { + private final Iterator> entries = cells.iterator(); + private Iterator currentIterator = null; + private Cell currentCell; + + @Override + public Cell current() { + return this.currentCell; + } + + @Override + public boolean advance() { + if (this.currentIterator == null) { + if (!this.entries.hasNext()) return false; + this.currentIterator = this.entries.next().iterator(); + } + if (this.currentIterator.hasNext()) { + this.currentCell = this.currentIterator.next(); + return true; + } + this.currentCell = null; + this.currentIterator = null; + return advance(); + } + + @Override + public long heapSize() { + return size; + } + }; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 334754a..ffe88a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2214,8 +2214,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa conf, server, fs, logDir, oldLogDir); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; - } - else { + } else { server.replicationSourceHandler = (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, fs, logDir, oldLogDir); @@ -3710,15 +3709,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, - final ReplicateWALEntryRequest request) throws ServiceException { + final ReplicateWALEntryRequest request) + throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); - HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList()); - if (entries != null && entries.length > 0) { - replicationSinkHandler.replicateLogEntries(entries); - } + this.replicationSinkHandler.replicateLogEntries(request.getEntryList(), + ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java index 754cff0..28573bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; /** * A sink for a replication stream has to expose this service. @@ -30,11 +32,11 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; */ @InterfaceAudience.Private public interface ReplicationSinkService extends ReplicationService { - - /** + /** * Carry on the list of log entries down to the sink - * @param entries list of entries to replicate + * @param entries list of WALEntries to replicate + * @param cells Cells that the WALEntries refer to (if cells is non-null) * @throws IOException */ - public void replicateLogEntries(HLog.Entry[] entries) throws IOException; -} + public void replicateLogEntries(List entries, CellScanner cells) throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index b908323..edc5c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -30,10 +28,9 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { - /** * Returns a WALObserver for the service. This is needed to * observe log rolls and log archival events. */ public WALActionsListener getWALActionsListener(); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7b84ccd..387f44d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Executors; @@ -33,13 +34,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -163,11 +165,14 @@ public class Replication implements WALActionsListener, /** * Carry on the list of log entries down to the sink * @param entries list of entries to replicate + * @param cells The data -- the cells -- that entries describes (the entries + * do not contain the Cells we are replicating; they are passed here on the side in this + * CellScanner). * @throws IOException */ - public void replicateLogEntries(HLog.Entry[] entries) throws IOException { + public void replicateLogEntries(List entries, CellScanner cells) throws IOException { if (this.replication) { - this.replicationSink.replicateEntries(entries); + this.replicationSink.replicateEntries(entries, cells); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 67d921c..0e98b21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -34,19 +34,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -108,17 +112,17 @@ public class ReplicationSink { } /** - * Replicate this array of entries directly into the local cluster - * using the native client. + * Replicate this array of entries directly into the local cluster using the native client. + * Like {@link #replicateEntries(org.apache.hadoop.hbase.regionserver.wal.HLog.Entry[])} only + * operates against raw protobuf type saving on a convertion from pb to pojo. * * @param entries + * @param cells * @throws IOException */ - public void replicateEntries(HLog.Entry[] entries) - throws IOException { - if (entries.length == 0) { - return; - } + public void replicateEntries(List entries, final CellScanner cells) throws IOException { + if (entries.isEmpty()) return; + if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner"); // Very simple optimization where we batch sequences of rows going // to the same table. try { @@ -126,40 +130,41 @@ public class ReplicationSink { // Map of table => list of Rows, we only want to flushCommits once per // invocation of this method per table. Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); - for (HLog.Entry entry : entries) { - WALEdit edit = entry.getEdit(); - byte[] table = entry.getKey().getTablename(); - Put put = null; - Delete del = null; - KeyValue lastKV = null; - List kvs = edit.getKeyValues(); - for (KeyValue kv : kvs) { - if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { - if (kv.isDelete()) { - del = new Delete(kv.getRow()); - del.setClusterId(entry.getKey().getClusterId()); - addToMultiMap(rows, table, del); - } else { - put = new Put(kv.getRow()); - put.setClusterId(entry.getKey().getClusterId()); - addToMultiMap(rows, table, put); - } + for (WALEntry entry : entries) { + byte[] table = entry.getKey().getTableName().toByteArray(); + Cell previousCell = null; + Mutation m = null; + java.util.UUID uuid = toUUID(entry.getKey().getClusterId()); + int count = entry.getAssociatedCellCount(); + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } - if (kv.isDelete()) { - del.addDeleteMarker(kv); + Cell cell = cells.current(); + if (isNewRowOrType(previousCell, cell)) { + // Create new mutation + m = CellUtil.isDelete(cell)? + new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): + new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + m.setClusterId(uuid); + addToMultiMap(rows, table, m); + } + if (CellUtil.isDelete(cell)) { + ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); } else { - put.add(kv); + ((Put)m).add(KeyValueUtil.ensureKeyValue(cell)); } - lastKV = kv; + previousCell = cell; } totalReplicated++; } for (Entry> entry : rows.entrySet()) { batch(entry.getKey(), entry.getValue()); } - this.metrics.setAgeOfLastAppliedOp( - entries[entries.length-1].getKey().getWriteTime()); - this.metrics.applyBatch(entries.length); + int size = entries.size(); + this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); + this.metrics.applyBatch(size); this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); @@ -168,6 +173,20 @@ public class ReplicationSink { } /** + * @param previousCell + * @param cell + * @return True if we have crossed over onto a new row or type + */ + private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { + return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || + !CellUtil.matchingRow(previousCell, cell); + } + + private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { + return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + + /** * Simple helper to a map from key to (a list of) values * TODO: Make a general utility method * @param map diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..bf9266b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -177,6 +177,9 @@ public class ReplicationSource extends Thread new PriorityBlockingQueue( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.zkHelper = manager.getRepZkWrapper(); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); @@ -456,7 +459,6 @@ public class ReplicationSource extends Thread // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { - chooseSinks(); if (this.isActive() && this.currentPeers.size() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java new file mode 100644 index 0000000..6aea022 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.protobuf; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category(SmallTests.class) +public class TestReplicationProtobuf { + /** + * Little test to check we can basically convert list of a list of KVs into a CellScanner + * @throws IOException + */ + @Test + public void testGetCellScanner() throws IOException { + List a = new ArrayList(); + KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L); + a.add(akv); + // Add a few just to make it less regular. + a.add(new KeyValue(Bytes.toBytes("aa"), -1L)); + a.add(new KeyValue(Bytes.toBytes("aaa"), -1L)); + List b = new ArrayList(); + KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L); + a.add(bkv); + List c = new ArrayList(); + KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L); + c.add(ckv); + List> all = new ArrayList>(); + all.add(a); + all.add(b); + all.add(c); + CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all, 0); + testAdvancetHasSameRow(scanner, akv); + // Skip over aa + scanner.advance(); + // Skip over aaa + scanner.advance(); + testAdvancetHasSameRow(scanner, bkv); + testAdvancetHasSameRow(scanner, ckv); + assertFalse(scanner.advance()); + } + + private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException { + scanner.advance(); + assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(), + scanner.current().getRowLength(), + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 61ae5b5..ddbd56d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -449,13 +449,14 @@ public class TestReplicationSmallTests extends TestReplicationBase { scan = new Scan(); + long start = System.currentTimeMillis(); for (int i = 0; i < NB_RETRIES; i++) { scanner = htable2.getScanner(scan); res = scanner.next(NB_ROWS_IN_BIG_BATCH); scanner.close(); if (res.length != NB_ROWS_IN_BIG_BATCH) { - if (i == NB_RETRIES-1) { + if (i == NB_RETRIES - 1) { int lastRow = -1; for (Result result : res) { int currentRow = Bytes.toInt(result.getRow()); @@ -465,8 +466,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { lastRow = currentRow; } LOG.error("Last row: " + lastRow); - fail("Waited too much time for normal batch replication, " - + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH); + fail("Waited too much time for normal batch replication, " + + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" + + (System.currentTimeMillis() - start) + "ms"); } else { LOG.info("Only got " + res.length + " rows"); Thread.sleep(SLEEP_TIME);