diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 751be75..93a78de 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5610,83 +5610,4 @@ public class HRegion implements HeapSize { // , Writable{ } } } - - /** - * This function is used to construct replay mutations from WALEdits - * @param entries - * @param cells - * @param clusterId - * @param logEntries List of Pair contructed from its PB version - WALEntry - * instances - * @return list of Pair to be replayed - * @throws IOException - */ - List> getReplayMutations(List entries, - CellScanner cells, UUID clusterId, List> logEntries) - throws IOException { - - List> mutations = new ArrayList>(); - List> tmpEditMutations = - new ArrayList>(); - - for (WALEntry entry : entries) { - HLogKey logKey = null; - WALEdit val = null; - Cell previousCell = null; - Mutation m = null; - tmpEditMutations.clear(); - - int count = entry.getAssociatedCellCount(); - if (coprocessorHost != null) { - val = new WALEdit(); - } - - for (int i = 0; i < count; i++) { - // Throw index out of bounds if our cell count is off - if (!cells.advance()) { - throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); - } - Cell cell = cells.current(); - if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell)); - - boolean isNewRowOrType = - previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() - || !CellUtil.matchingRow(previousCell, cell); - if (isNewRowOrType) { - // Create new mutation - if (CellUtil.isDelete(cell)) { - m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - tmpEditMutations.add(new Pair(MutationType.DELETE, m)); - } else { - m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - tmpEditMutations.add(new Pair(MutationType.PUT, m)); - } - } - if (CellUtil.isDelete(cell)) { - ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); - } else { - ((Put) m).add(KeyValueUtil.ensureKeyValue(cell)); - } - previousCell = cell; - } - - // Start coprocessor replay here. The coprocessor is for each WALEdit - // instead of a KeyValue. - if (coprocessorHost != null) { - WALKey walKey = entry.getKey(); - logKey = - new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), walKey.getLogSequenceNumber(), - walKey.getWriteTime(), clusterId); - if (coprocessorHost.preWALRestore(this.getRegionInfo(), logKey, val)) { - // if bypass this log entry, ignore it ... - continue; - } - logEntries.add(new Pair(logKey, val)); - } - mutations.addAll(tmpEditMutations); - } - - return mutations; - } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1a01e4e..fc53c9a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -197,6 +197,7 @@ import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -3888,9 +3889,27 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List> walEntries = new ArrayList>(); - List> mutations = region.getReplayMutations( - request.getEntryList(), cells, UUID.fromString(this.clusterId), walEntries); + List> mutations = new ArrayList>(); + for (WALEntry entry : entries) { + Pair walEntry = (coprocessorHost == null) ? null : + new Pair(); + List> edits = HLogSplitter.getMutationsFromWALEntry(entry, + cells, walEntry); + if (coprocessorHost != null) { + // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a + // KeyValue. + if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), + walEntry.getSecond())) { + // if bypass this log entry, ignore it ... + continue; + } + walEntries.add(walEntry); + } + mutations.addAll(edits); + } + if (!mutations.isEmpty()) { OperationStatus[] result = doBatchOp(region, mutations, true); // check if it's a partial success @@ -3900,9 +3919,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } } } - if (region.getCoprocessorHost() != null) { + if (coprocessorHost != null) { for (Pair wal : walEntries) { - region.getCoprocessorHost().postWALRestore(region.getRegionInfo(), wal.getFirst(), + coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } @@ -4117,12 +4136,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } /** - * Execute a list of Put/Delete mutations. + * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of + * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations * @param isReplay - * @return an array of OperationStatus which internally contains the - * OperationStatusCode and the exceptionMessage if any + * @return an array of OperationStatus which internally contains the OperationStatusCode and the + * exceptionMessage if any * @throws IOException */ protected OperationStatus[] doBatchOp(final HRegion region, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 782c0b8..a02c38d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -52,6 +53,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -64,6 +69,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -73,9 +79,13 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -1827,4 +1837,76 @@ public class HLogSplitter { super(s); } } + + /** + * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey & + * WALEdit from the passed in WALEntry + * @param entry + * @param cells + * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances + * extracted from the passed in WALEntry. + * @return list of Pair to be replayed + * @throws IOException + */ + public static List> getMutationsFromWALEntry(WALEntry entry, + CellScanner cells, Pair logEntry) throws IOException { + + if (entry == null) { + // return an empty array + return new ArrayList>(); + } + + int count = entry.getAssociatedCellCount(); + List> mutations = new ArrayList>(); + Cell previousCell = null; + Mutation m = null; + HLogKey key = null; + WALEdit val = null; + if (logEntry != null) val = new WALEdit(); + + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell)); + + boolean isNewRowOrType = + previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(previousCell, cell); + if (isNewRowOrType) { + // Create new mutation + if (CellUtil.isDelete(cell)) { + m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + mutations.add(new Pair(MutationType.DELETE, m)); + } else { + m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + mutations.add(new Pair(MutationType.PUT, m)); + } + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); + } else { + ((Put) m).add(KeyValueUtil.ensureKeyValue(cell)); + } + previousCell = cell; + } + + // reconstruct HLogKey + if (logEntry != null) { + WALKey walKey = entry.getKey(); + List clusterIds = new ArrayList(walKey.getClusterIdsCount()); + for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { + clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); + } + key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey + .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(), + clusterIds); + logEntry.setFirst(key); + logEntry.setSecond(val); + } + + return mutations; + } }