From e5ce6eef65b920b8abc0cba5f9b4b18b2fdbdf25 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Wed, 9 Mar 2016 17:08:51 +0530 Subject: [PATCH] HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 +- .../hadoop/hbase/regionserver/wal/MetricsWAL.java | 7 ++- .../hbase/regionserver/wal/WALActionsListener.java | 10 ++++- .../replication/regionserver/Replication.java | 50 ++++++++++++++-------- .../hadoop/hbase/wal/DisabledWALProvider.java | 7 +-- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 3 +- 6 files changed, 53 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index f3f869c..28ffaf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1413,14 +1413,14 @@ public class FSHLog implements WAL { } } - private long postAppend(final Entry e, final long elapsedTime) { + private long postAppend(final Entry e, final long elapsedTime) throws IOException { long len = 0; if (!listeners.isEmpty()) { for (Cell cell : e.getEdit().getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } for (WALActionsListener listener : listeners) { - listener.postAppend(len, elapsedTime); + listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); } } return len; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 24fd940..e0f4a61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.util.StringUtils; @@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base { } @Override - public void postAppend(final long size, final long time) { + public void postAppend(final long size, final long time, final WALKey logkey, + final WALEdit logEdit) throws IOException { source.incrementAppendCount(); source.incrementAppendTime(time); source.incrementAppendSize(size); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index a6452e2..adcc6eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -98,8 +98,12 @@ public interface WALActionsListener { * TODO: Combine this with above. * @param entryLen approx length of cells in this append. * @param elapsedTimeMillis elapsed time in milliseconds. + * @param logKey A WAL key + * @param logEdit A WAL edit containing list of cells. + * @throws IOException if any network or I/O error occurred */ - void postAppend(final long entryLen, final long elapsedTimeMillis); + void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException; /** * For notification post writer sync. Used by metrics system at least. @@ -136,7 +140,9 @@ public interface WALActionsListener { } @Override - public void postAppend(final long entryLen, final long elapsedTimeMillis) {} + public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException { + } @Override public void postSync(final long timeInNanos, final int handlerSyncs) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index bb4a5a3..fa5e222 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -256,6 +257,34 @@ public class Replication extends WALActionsListener.Base implements scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); } + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + NavigableMap scopes = logKey.getReplicationScopes(); + if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) { + TableName tableName = logKey.getTablename(); + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load events information from the wal file.", e); + throw e; + } + + for (StoreDescriptor s : bld.getStoresList()) { + byte[] fam = s.getFamilyName().toByteArray(); + if (scopes.containsKey(fam)) { + addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s); + } + } + } + } + } + } + /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. @@ -268,26 +297,9 @@ public class Replication extends WALActionsListener.Base implements WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) throws IOException { boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); - byte[] family; boolean foundOtherEdits = false; for (Cell cell : logEdit.getCells()) { - if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - try { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - for (StoreDescriptor s : bld.getStoresList()) { - family = s.getFamilyName().toByteArray(); - addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s); - } - } catch (IOException e) { - LOG.error("Failed to get bulk load events information from the wal file.", e); - throw e; - } - } else { - // Skip the flush/compaction/region events - continue; - } - } else { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { foundOtherEdits = true; } } @@ -301,7 +313,7 @@ public class Replication extends WALActionsListener.Base implements try { replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); } catch (ReplicationException e) { - LOG.error("Failed to create hfile references in ZK.", e); + LOG.error("Failed to add hfile references in the replication queue.", e); throw new IOException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index c3d4b2c..37b04c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -153,16 +153,17 @@ class DisabledWALProvider implements WALProvider { } @Override - public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) { + public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) + throws IOException { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; for (Cell cell : edits.getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } - final long elapsed = (System.nanoTime() - start)/1000000l; + final long elapsed = (System.nanoTime() - start) / 1000000l; for (WALActionsListener listener : this.listeners) { - listener.postAppend(len, elapsed); + listener.postAppend(len, elapsed, key, edits); } } return -1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 4a15d3c..7ce03b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -525,7 +525,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } @Override - public void postAppend(final long size, final long elapsedTime) { + public void postAppend(final long size, final long elapsedTime, final WALKey logkey, + final WALEdit logEdit) { appendMeter.mark(size); } }); -- 1.9.2.msysgit.0