From 7b6425a5978e7213b28393eb6e73393e6a77ad2c Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Fri, 9 Nov 2018 11:10:20 +0000 Subject: [PATCH] Initial version for WAL entry splitter CP. --- hbase-replication/pom.xml | 99 ++++++ ...alEntrySplitterReplicationCoprocessor.java | 287 ++++++++++++++++++ ...alEntrySplitterReplicationCoprocessor.java | 160 ++++++++++ 3 files changed, 546 insertions(+) create mode 100644 hbase-replication/pom.xml create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/regionserver/WalEntrySplitterReplicationCoprocessor.java create mode 100644 hbase-replication/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalEntrySplitterReplicationCoprocessor.java diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml new file mode 100644 index 0000000..fa27128 --- /dev/null +++ b/hbase-replication/pom.xml @@ -0,0 +1,99 @@ + + + + hbase-operator-tools + org.apache.hbase + 1.0.0-SNAPSHOT + + 4.0.0 + + hbase-replication + + + + + + src/test/resources/META-INF/ + META-INF/ + + NOTICE + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.4 + + 1.7 + 1.7 + + + + + + + + + org.apache.hbase + hbase-server + 1.5.0-SNAPSHOT + + + org.apache.hbase + hbase-testing-util + 1.5.0-SNAPSHOT + test + + + + org.mockito + mockito-inline + 2.8.9 + test + + + + + + + apache-release + + + + org.apache.maven.plugins + maven-resources-plugin + + + license-javadocs + prepare-package + + copy-resources + + + ${project.build.directory}/apidocs + + + src/main/javadoc/META-INF/ + META-INF/ + + NOTICE + + true + + + + + + + + + + + \ No newline at end of file diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/regionserver/WalEntrySplitterReplicationCoprocessor.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/regionserver/WalEntrySplitterReplicationCoprocessor.java new file mode 100644 index 0000000..2900b1c --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/regionserver/WalEntrySplitterReplicationCoprocessor.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +public class WalEntrySplitterReplicationCoprocessor extends BaseRegionServerObserver { + + private static final Log LOG = LogFactory.getLog(WalEntrySplitterReplicationCoprocessor.class); + + public static final String MAX_OPS_PER_BATCH = + "hbase.support.coprocessor.walentrysplitter.max_ops"; + + private final int maxOps; + + HRegionServer regionServer; + + ReplicationSink replicationSink; + + AtomicLong totalReplicatedEdits; + + final Configuration conf; + + volatile Connection sharedHtableCon; + + private final Object sharedHtableConLock = new Object(); + + public WalEntrySplitterReplicationCoprocessor() throws IOException { + this.conf = HBaseConfiguration.create(); + //here we avoid batch mutating more than 1,0000 cells at once + this.maxOps = conf.getInt(MAX_OPS_PER_BATCH, 1000); + LOG.info("instantiated WalEntrySplitterReplicationCoprocessor."); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if(env instanceof RegionServerCoprocessorEnvironment){ + RegionServerServices serverServices = ((RegionServerCoprocessorEnvironment)env) + .getRegionServerServices(); + LOG.info("Retrieved RegionServerServices instance: " + serverServices); + if(serverServices instanceof HRegionServer){ + this.regionServer = (HRegionServer)serverServices; + } else { + LOG.warn("RegionServerServices not an instance of HRegionServer. " + + "This may cause replication metrics to be inaccurate. "); + } + } else { + LOG.warn("CoprocessorEnvironment not an instance of RegionServerCoprocessorEnvironment." + + " Could not get reference for ReplicationSink. " + + "This may cause replication metrics to be inaccurate. "); + } + LOG.info("finished loading WalEntrySplitterReplicationCoprocessor."); + } + + + private synchronized void loadSinkService(){ + if(this.replicationSink==null && this.regionServer!=null) { + ReplicationSinkService sinkService = this.regionServer.replicationSinkHandler; + LOG.info("Retrieved ReplicationSinkService instance: " + sinkService); + if (sinkService instanceof Replication) { + try { + Field field = sinkService.getClass().getDeclaredField("replicationSink"); + field.setAccessible(true); + this.replicationSink = (ReplicationSink) field.get(sinkService); + try { + field = this.replicationSink.getClass().getDeclaredField("totalReplicatedEdits"); + field.setAccessible(true); + this.totalReplicatedEdits = (AtomicLong) field.get(this.replicationSink); + } catch (Exception e) { + LOG.warn("Could not get reference to totalReplicatedEdits inside ReplicationSink " + + "instance. This may cause replication metrics to be inaccurate. ", e); + } + } catch (Exception e) { + LOG.warn("Could not get reference to ReplicationSink instance. " + + "This may cause replication metrics to be inaccurate. ", e); + } + } else { + LOG.warn("sink service not an instance of Replication. " + + "This may cause replication metrics to be inaccurate. "); + } + } + } + + @Override + public void preReplicateLogEntries(ObserverContext ctx, + List entries, CellScanner cells) throws IOException { + loadSinkService(); + try { + List modifiableEntries = this.getModifiableList(entries); + for (int i = 0; i < modifiableEntries.size(); i++) { + AdminProtos.WALEntry entry = modifiableEntries.get(i); + LOG.debug("replication entry cell count: " + entry.getAssociatedCellCount()); + try { + long totalReplicated = 0; + // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per + // invocation of this method per table and cluster id. + Map, List>> rowMap = + new TreeMap, List>>(); + TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); + Cell previousCell = null; + Mutation m = null; + int count = entry.getAssociatedCellCount(); + for (int j = 0; j < count; j++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + j); + } + Cell cell = cells.current(); + if (isNewRowOrType(previousCell, cell) || rowMap.isEmpty()) { + // Create new mutation + m = newMutation(cell, entry, rowMap, table); + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).addDeleteMarker(cell); + } else { + ((Put) m).add(cell); + } + previousCell = cell; + totalReplicated++; + if (totalReplicated >= this.maxOps) { + LOG.trace("batching " + this.maxOps + " cells from entry... "); + for (Map.Entry, List>> edit : rowMap.entrySet()) { + batch(edit.getKey(), edit.getValue().values()); + } + updateMetrics(entry, totalReplicated); + //ensures same OPs will not get re-batched on for loop of line #131 + rowMap.clear(); + totalReplicated = 0; + } + } + for (Map.Entry, List>> edit : rowMap.entrySet()) { + batch(edit.getKey(), edit.getValue().values()); + } + if(totalReplicated>0) { + updateMetrics(entry, totalReplicated); + } + modifiableEntries.remove(i); + } catch (IOException ex) { + LOG.error("Unable to accept edit because:", ex); + throw ex; + } + } + LOG.debug("finished pre-replicate call for " + entries.size() + " entries."); + } catch (Exception e) { + LOG.error("Unexpected error: ", e); + } + } + + private void updateMetrics(AdminProtos.WALEntry entry, long totalReplicated) { + if(this.replicationSink!=null) { + this.replicationSink.getSinkMetrics().setAgeOfLastAppliedOp(entry.getKey().getWriteTime()); + this.replicationSink.getSinkMetrics().applyBatch(totalReplicated); + }else{ + LOG.info("Could not update replication sink metrics with getAgeOfLastAppliedOp: " + + entry.getKey().getWriteTime() + ", applied batch ops: " + totalReplicated); + } + if(this.totalReplicatedEdits!=null) { + this.totalReplicatedEdits.addAndGet(totalReplicated); + }else{ + LOG.info("Could not update replication stats with extra replicated edits: " + + totalReplicated); + } + } + + private Mutation newMutation(Cell cell, AdminProtos.WALEntry entry, + Map, List>> rowMap, TableName table) { + + Mutation m = CellUtil.isDelete(cell) ? + new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) : + new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + + List clusterIds = new ArrayList(); + + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + + m.setClusterIds(clusterIds); + + addToHashMultiMap(rowMap, table, clusterIds, m); + + return m; + } + + // Kind of a hack, but can't see other way: We need to remove entries batched by coprocessor + // from original list, but the passed list is an unmodifiable collection, + // so we need to do reflection to actually get the modifiable collection. + private List getModifiableList(List entries) + throws Exception { + Field field = entries.getClass().getSuperclass().getDeclaredField("list"); + field.setAccessible(true); + return (List) field.get(entries); + } + + private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { + return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || !CellUtil + .matchingRow(previousCell, cell); + } + + private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, + V value) { + Map> innerMap = map.get(key1); + if (innerMap == null) { + innerMap = new HashMap>(); + map.put(key1, innerMap); + } + List values = innerMap.get(key2); + if (values == null) { + values = new ArrayList(); + innerMap.put(key2, values); + } + values.add(value); + return values; + } + + private UUID toUUID(final HBaseProtos.UUID uuid) { + return new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + + protected void batch(TableName tableName, Collection> allRows) throws IOException { + if (allRows.isEmpty()) { + return; + } + Table table = null; + try { + // See https://en.wikipedia.org/wiki/Double-checked_locking + Connection connection = this.sharedHtableCon; + if (connection == null) { + synchronized (sharedHtableConLock) { + connection = this.sharedHtableCon; + if (connection == null) { + connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf); + } + } + } + table = connection.getTable(tableName); + for (List rows : allRows) { + table.batch(rows, new Object[rows.size()]); + } + } catch (InterruptedException ix) { + throw (InterruptedIOException) new InterruptedIOException().initCause(ix); + } finally { + if (table != null) { + table.close(); + } + } + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalEntrySplitterReplicationCoprocessor.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalEntrySplitterReplicationCoprocessor.java new file mode 100644 index 0000000..982a69e --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalEntrySplitterReplicationCoprocessor.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyListOf; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSink; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; +import org.apache.hadoop.hbase.testclassification.SmallTests; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +@Category(SmallTests.class) +public class TestWalEntrySplitterReplicationCoprocessor { + + private WalEntrySplitterReplicationCoprocessor coprocessor; + private Connection mockedConnection; + private Table mockedTable; + private HRegionServer mockedRegionServer; + private RegionServerCoprocessorEnvironment mockedEnvironment; + private ReplicationSink mockedReplicationSink; + private MetricsSink mockedMetrics; + private static final String TABLE_NAME = "TEST_TBL"; + + @Before + public void setup() throws Exception { + this.coprocessor = new WalEntrySplitterReplicationCoprocessor(); + this.mockedConnection = Mockito.mock(Connection.class); + this.coprocessor.sharedHtableCon = this.mockedConnection; + this.mockedTable = mock(Table.class); + this.mockedRegionServer = mock(HRegionServer.class); + when(this.mockedConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(this.mockedTable); + this.mockedEnvironment = mock(RegionServerCoprocessorEnvironment.class); + when(this.mockedEnvironment.getRegionServerServices()).thenReturn(this.mockedRegionServer); + this.mockedRegionServer.replicationSinkHandler = mock(Replication.class); + this.mockedReplicationSink = mock(ReplicationSink.class); + this.mockedMetrics = mock(MetricsSink.class); + when(this.mockedReplicationSink.getSinkMetrics()).thenReturn(this.mockedMetrics); + + Field field = this.mockedRegionServer.replicationSinkHandler.getClass() + .getDeclaredField("replicationSink"); + field.setAccessible(true); + field.set(this.mockedRegionServer.replicationSinkHandler,this.mockedReplicationSink); + + field = this.mockedReplicationSink.getClass().getDeclaredField("totalReplicatedEdits"); + field.setAccessible(true); + field.set(this.mockedReplicationSink, new AtomicLong(0)); + + } + + @Test + public void testStartCoprocessorValidEnvironment() throws IOException { + this.coprocessor.start(this.mockedEnvironment); + assertTrue(this.coprocessor.regionServer == this.mockedRegionServer); + } + + @Test + public void testStartCoprocessorInValidEnvironment() throws IOException { + this.coprocessor.start(mock(CoprocessorEnvironment.class)); + assertNull(this.coprocessor.regionServer); + } + + @Test + public void testPreReplicateLogEntriesFewOps() throws Exception{ + this.coprocessor.start(this.mockedEnvironment); + this.testPreReplicateLogEntriesBatches(10,1); + verify(this.mockedMetrics, times(1)).applyBatch(10L); + } + + @Test + public void testPreReplicateLogEntriesMoreThan1kOps() throws Exception{ + this.coprocessor.start(this.mockedEnvironment); + this.testPreReplicateLogEntriesBatches(1001,2); + verify(this.mockedMetrics, times(1)).applyBatch(1000L); + verify(this.mockedMetrics, times(1)).applyBatch(1L); + } + + @Test + public void testPreReplicateLogEntriesInvalidEnvironementMoreThan1kOps() throws Exception{ + this.coprocessor.start(mock(CoprocessorEnvironment.class)); + this.testPreReplicateLogEntriesBatches(1,1); + verify(this.mockedMetrics, times(0)).applyBatch(anyLong()); + } + + private void testPreReplicateLogEntriesBatches(int numberOfOps, int expectedBatches) throws Exception { + List entries = new ArrayList<>(); + AdminProtos.WALEntry mockedEntry = mock(AdminProtos.WALEntry.class); + entries.add(mockedEntry); + WALProtos.WALKey mockedKey = mock(WALProtos.WALKey.class); + when(mockedEntry.getKey()).thenReturn(mockedKey); + when(mockedKey.getTableName()).thenReturn(ByteString.copyFrom((TableName.valueOf(TABLE_NAME).getName()))); + when(mockedEntry.getAssociatedCellCount()).thenReturn(new Integer(numberOfOps)); + List clusterIds = new ArrayList(); + HBaseProtos.UUID mockedId = mock(HBaseProtos.UUID.class); + when(mockedId.getLeastSigBits()).thenReturn(0L); + when(mockedId.getMostSigBits()).thenReturn(0L); + clusterIds.add(mockedId); + when(mockedKey.getClusterIdsList()).thenReturn(clusterIds); + CellScanner mockedScanner = mock(CellScanner.class); + when(mockedScanner.advance()).thenReturn(true); + + KeyValue cell = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("q")); + when(mockedScanner.current()).thenReturn(cell); + + this.coprocessor.preReplicateLogEntries(null, + Collections.unmodifiableList(entries), + mockedScanner); + verify(this.mockedTable, times(expectedBatches)).batch(anyListOf(Row.class), (Object[]) anyObject()); + + } + +} -- 2.17.1 (Apple Git-112)