From f34c3cf60be9bca75d0ed3d680a1a39076724e49 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 9 Jun 2018 08:24:07 -0700 Subject: [PATCH] HBASE-20711 Save on a Cell iteration when writing --- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 74 ++++++++++++++++------ .../hadoop/hbase/regionserver/RSRpcServices.java | 28 +------- 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index d8d46b6664..d0582ea44d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.ExtendedCellBuilder; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -183,6 +184,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Protobufs utility. @@ -196,6 +199,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; // TODO: Generate the non-shaded protobufutil from this one. @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class public final class ProtobufUtil { + private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class); + private ProtobufUtil() { } @@ -597,21 +602,29 @@ public final class ProtobufUtil { return toPut(proto, null); } + public static Put toPut(final MutationProto proto, final CellScanner cellScanner) + throws IOException { + return toPut(proto, cellScanner, 0); + } + /** * Convert a protocol buffer Mutate to a Put. * * @param proto The protocol buffer MutationProto to convert * @param cellScanner If non-null, the Cell data that goes with this proto. + * @param maxCellSize Throws exception if Cell > this size (pass 0 for no check). * @return A client Put. * @throws IOException */ - public static Put toPut(final MutationProto proto, final CellScanner cellScanner) + public static Put toPut(final MutationProto proto, final CellScanner cellScanner, + long maxCellSize) throws IOException { // TODO: Server-side at least why do we convert back to the Client types? Why not just pb it? MutationType type = proto.getMutateType(); assert type == MutationType.PUT: type.name(); long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP; - Put put = proto.hasRow() ? new Put(proto.getRow().toByteArray(), timestamp) : null; + byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; + Put put = row != null? new Put(row, timestamp) : null; int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; if (cellCount > 0) { // The proto has metadata only and the data is separate to be found in the cellScanner. @@ -628,9 +641,11 @@ public final class ProtobufUtil { if (put == null) { put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), timestamp); } + checkCellSizeLimit(maxCellSize, cell); put.add(cell); } } else { + // All Cells are inside the passed proto. if (put == null) { throw new IllegalArgumentException("row cannot be null"); } @@ -648,56 +663,63 @@ public final class ProtobufUtil { ts = qv.getTimestamp(); } byte[] allTagsBytes; + Cell cell; if (qv.hasTags()) { allTagsBytes = qv.getTags().toByteArray(); if(qv.hasDeleteType()) { - put.add(cellBuilder.clear() + cell = cellBuilder.clear() .setRow(proto.getRow().toByteArray()) .setFamily(family) .setQualifier(qv.hasQualifier() ? qv.getQualifier().toByteArray() : null) .setTimestamp(ts) .setType(fromDeleteType(qv.getDeleteType()).getCode()) .setTags(allTagsBytes) - .build()); + .build(); } else { - put.add(cellBuilder.clear() - .setRow(put.getRow()) + cell = cellBuilder.clear() + .setRow(proto.getRow().toByteArray()) .setFamily(family) .setQualifier(qv.hasQualifier() ? qv.getQualifier().toByteArray() : null) .setTimestamp(ts) .setType(Cell.Type.Put) .setValue(qv.hasValue() ? qv.getValue().toByteArray() : null) .setTags(allTagsBytes) - .build()); + .build(); } } else { if(qv.hasDeleteType()) { - put.add(cellBuilder.clear() - .setRow(put.getRow()) + cell = cellBuilder.clear() + .setRow(proto.getRow().toByteArray()) .setFamily(family) .setQualifier(qv.hasQualifier() ? qv.getQualifier().toByteArray() : null) .setTimestamp(ts) .setType(fromDeleteType(qv.getDeleteType()).getCode()) - .build()); + .build(); } else{ - put.add(cellBuilder.clear() - .setRow(put.getRow()) + cell = cellBuilder.clear() + .setRow(proto.getRow().toByteArray()) .setFamily(family) .setQualifier(qv.hasQualifier() ? qv.getQualifier().toByteArray() : null) .setTimestamp(ts) .setType(Type.Put) .setValue(qv.hasValue() ? qv.getValue().toByteArray() : null) - .build()); + .build(); } } + checkCellSizeLimit(maxCellSize, cell); + put.add(cell); } } } put.setDurability(toDurability(proto.getDurability())); + addAttributes(put, proto); + return put; + } + + private static void addAttributes(Mutation m, MutationProto proto) { for (NameBytesPair attribute: proto.getAttributeList()) { - put.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); + m.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); } - return put; } /** @@ -712,6 +734,23 @@ public final class ProtobufUtil { return toDelete(proto, null); } + /** + * Check cell size. + * Only works if on server-side where there are ExtendedCells. Else its a noop. + * TODO: Make it work for client-side too... no harm. + */ + private static void checkCellSizeLimit(final long maxCellSize, final Cell c) + throws DoNotRetryIOException { + if (c instanceof ExtendedCell) { + int size = ((ExtendedCell)c).getSerializedSize(true); + if (size > maxCellSize) { + String msg = "Cell with size " + size + " exceeds limit of " + maxCellSize + " bytes"; + LOG.debug(msg); + throw new DoNotRetryIOException(msg); + } + } + } + /** * Convert a protocol buffer Mutate to a Delete * @@ -776,11 +815,10 @@ public final class ProtobufUtil { } } delete.setDurability(toDurability(proto.getDurability())); - for (NameBytesPair attribute: proto.getAttributeList()) { - delete.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); - } + addAttributes(delete, proto); return delete; } + @FunctionalInterface private interface ConsumerWithException { void accept(T t, U u) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3db7c083ae..6b3bc39ab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -615,9 +615,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } switch (type) { case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner, region.maxCellSize); ++countOfCompleteMutation; - checkCellSizeLimit(region, put); spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); rm.add(put); break; @@ -659,7 +658,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throws IOException { long before = EnvironmentEdgeManager.currentTime(); Append append = ProtobufUtil.toAppend(mutation, cellScanner); - checkCellSizeLimit(region, append); spaceQuota.getPolicyEnforcement(region).check(append); quota.addMutation(append); Result r = null; @@ -711,7 +709,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throws IOException { long before = EnvironmentEdgeManager.currentTime(); Increment increment = ProtobufUtil.toIncrement(mutation, cells); - checkCellSizeLimit(region, increment); spaceQuota.getPolicyEnforcement(region).check(increment); quota.addMutation(increment); Result r = null; @@ -924,22 +921,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return cellsToReturn; } - private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { - if (r.maxCellSize > 0) { - CellScanner cells = m.cellScanner(); - while (cells.advance()) { - int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); - if (size > r.maxCellSize) { - String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; - if (LOG.isDebugEnabled()) { - LOG.debug(msg); - } - throw new DoNotRetryIOException(msg); - } - } - } - } - private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List mutations, final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) @@ -999,7 +980,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { - mutation = ProtobufUtil.toPut(m, cells); + mutation = ProtobufUtil.toPut(m, cells, region.maxCellSize); batchContainsPuts = true; } else { mutation = ProtobufUtil.toDelete(m, cells); @@ -1007,7 +988,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } mutationActionMap.put(mutation, action); mArray[i++] = mutation; - checkCellSizeLimit(region, mutation); // Check if a space quota disallows this mutation spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); quota.addMutation(mutation); @@ -2773,8 +2753,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); break; case PUT: - Put put = ProtobufUtil.toPut(mutation, cellScanner); - checkCellSizeLimit(region, put); + Put put = ProtobufUtil.toPut(mutation, cellScanner, region.maxCellSize); // Throws an exception when violated spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put); @@ -2809,7 +2788,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; case DELETE: Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); - checkCellSizeLimit(region, delete); spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); quota.addMutation(delete); if (request.hasCondition()) { -- 2.16.3