From 4bd39d7d68ec8c5b9e6a87f389fbc7b083780edb Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Mon, 21 Mar 2016 17:25:50 +0530 Subject: [PATCH] HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 +-- .../hadoop/hbase/regionserver/wal/MetricsWAL.java | 7 ++++- .../hbase/regionserver/wal/WALActionsListener.java | 10 +++++-- .../replication/regionserver/Replication.java | 35 +++++++++++++++++++--- .../hadoop/hbase/wal/DisabledWALProvider.java | 4 +-- .../hbase/regionserver/wal/TestMetricsWAL.java | 4 +-- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 3 +- 7 files changed, 53 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index eb1cf57..c01cc1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1399,14 +1399,14 @@ public class FSHLog implements WAL { } } - private long postAppend(final Entry e, final long elapsedTime) { + private long postAppend(final Entry e, final long elapsedTime) throws IOException { long len = 0; if (!listeners.isEmpty()) { for (Cell cell : e.getEdit().getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } for (WALActionsListener listener : listeners) { - listener.postAppend(len, elapsedTime); + listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); } } return len; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 99792e5..69a31cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.util.StringUtils; @@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base { } @Override - public void postAppend(final long size, final long time) { + public void postAppend(final long size, final long time, final WALKey logkey, + final WALEdit logEdit) throws IOException { source.incrementAppendCount(); source.incrementAppendTime(time); source.incrementAppendSize(size); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index db98083..60ab7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -101,8 +101,12 @@ public interface WALActionsListener { * TODO: Combine this with above. * @param entryLen approx length of cells in this append. * @param elapsedTimeMillis elapsed time in milliseconds. + * @param logKey A WAL key + * @param logEdit A WAL edit containing list of cells. + * @throws IOException if any network or I/O occurred */ - void postAppend(final long entryLen, final long elapsedTimeMillis); + void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException; /** * For notification post writer sync. Used by metrics system at least. @@ -140,7 +144,9 @@ public interface WALActionsListener { } @Override - public void postAppend(final long entryLen, final long elapsedTimeMillis) {} + public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException { + } @Override public void postSync(final long timeInNanos, final int handlerSyncs) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7110273..06138a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -262,6 +262,36 @@ public class Replication extends WALActionsListener.Base implements scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager()); } + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + NavigableMap scopes = logKey.getScopes(); + if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) { + TableName tableName = logKey.getTablename(); + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load events information from the wal file.", e); + throw e; + } + + for (StoreDescriptor s : bld.getStoresList()) { + byte[] fam = s.getFamilyName().toByteArray(); + // We have already scoped the entries as part + // WALActionsListener#visitLogEntryBeforeWrite notification + if (scopes.containsKey(fam)) { + addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s); + } + } + } + } + } + } + /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. @@ -314,10 +344,7 @@ public class Replication extends WALActionsListener.Base implements int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL) { scopes.put(family, scope); - addHFileRefsToQueue(replicationManager, tableName, family, s); } - } else { - addHFileRefsToQueue(replicationManager, tableName, family, s); } } } catch (IOException e) { @@ -331,7 +358,7 @@ public class Replication extends WALActionsListener.Base implements try { replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); } catch (ReplicationException e) { - LOG.error("Failed to create hfile references in ZK.", e); + LOG.error("Failed to add hfile references in the replication queue.", e); throw new IOException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 43738be..1816238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -155,7 +155,7 @@ class DisabledWALProvider implements WALProvider { @Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - boolean inMemstore) { + boolean inMemstore) throws IOException { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; @@ -164,7 +164,7 @@ class DisabledWALProvider implements WALProvider { } final long elapsed = (System.nanoTime() - start)/1000000l; for (WALActionsListener listener : this.listeners) { - listener.postAppend(len, elapsed); + listener.postAppend(len, elapsed, key, edits); } } return -1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index 7be3e66..9a7d494 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -57,8 +57,8 @@ public class TestMetricsWAL { public void testWalWrittenInBytes() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.postAppend(100, 900); - metricsWAL.postAppend(200, 2000); + metricsWAL.postAppend(100, 900, null, null); + metricsWAL.postAppend(200, 2000, null, null); verify(source, times(1)).incrementWrittenBytes(100); verify(source, times(1)).incrementWrittenBytes(200); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 3af853b..8af1882 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -514,7 +514,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } @Override - public void postAppend(final long size, final long elapsedTime) { + public void postAppend(final long size, final long elapsedTime, final WALKey logkey, + final WALEdit logEdit) { appendMeter.mark(size); } }); -- 1.9.2.msysgit.0