.../hbase/protobuf/ReplicationProtbufUtil.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 63 +++++++---- .../apache/hadoop/hbase/regionserver/HStore.java | 2 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 13 +-- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 10 +- .../hadoop/hbase/regionserver/wal/HLogKey.java | 48 +++++++-- .../hbase/regionserver/wal/WALActionsListener.java | 8 +- .../hadoop/hbase/regionserver/wal/WALUtil.java | 57 +++++----- .../hbase/replication/ScopeWALEntryFilter.java | 2 +- .../replication/regionserver/Replication.java | 70 ++++-------- .../hadoop/hbase/wal/DisabledWALProvider.java | 4 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 9 +- .../java/org/apache/hadoop/hbase/wal/WALKey.java | 119 +++++++++++++++++---- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 2 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 3 +- .../hadoop/hbase/coprocessor/TestWALObserver.java | 48 ++++++--- .../hbase/mapreduce/TestHLogRecordReader.java | 7 +- .../hadoop/hbase/mapreduce/TestImportExport.java | 16 +-- .../hbase/mapreduce/TestWALRecordReader.java | 20 ++-- .../hadoop/hbase/regionserver/TestBulkLoad.java | 9 +- .../hadoop/hbase/regionserver/TestHRegion.java | 14 +-- .../regionserver/TestHRegionReplayEvents.java | 6 +- .../regionserver/TestHRegionServerBulkLoad.java | 3 +- .../hadoop/hbase/regionserver/TestWALLockup.java | 10 +- .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 57 ++++++---- .../hbase/regionserver/wal/TestLogRollAbort.java | 12 ++- .../regionserver/wal/TestLogRollingNoCluster.java | 11 +- .../regionserver/wal/TestWALActionsListener.java | 12 ++- .../hbase/regionserver/wal/TestWALReplay.java | 47 +++++--- .../hbase/replication/TestReplicationBase.java | 9 ++ .../replication/TestReplicationSmallTests.java | 13 ++- .../TestReplicationWALEntryFilters.java | 62 +++++------ .../regionserver/TestReplicationSourceManager.java | 57 ++++++---- .../TestReplicationWALReaderManager.java | 13 ++- .../org/apache/hadoop/hbase/wal/FaultyFSLog.java | 7 +- .../hadoop/hbase/wal/TestDefaultWALProvider.java | 64 +++++++---- .../wal/TestDefaultWALProviderWithHLogKey.java | 7 +- .../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 11 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 74 +++++++++---- .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 11 +- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 15 ++- 41 files changed, 662 insertions(+), 365 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 91185af..8cb2237 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -134,7 +134,7 @@ public class ReplicationProtbufUtil { keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum()); } WALEdit edit = entry.getEdit(); - NavigableMap scopes = key.getScopes(); + NavigableMap scopes = key.getReplicationScopes(); if (scopes != null && !scopes.isEmpty()) { for (Map.Entry scope: scopes.entrySet()) { scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0d5a71e..d895477 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; + import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -582,6 +584,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; private final boolean regionStatsEnabled; + // Stores the replication scope of the various column families of the table + // that has non-default scope + private final NavigableMap replicationScope = new TreeMap( + Bytes.BYTES_COMPARATOR); /** * HRegion constructor. This constructor should only be used for testing and @@ -660,6 +666,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; + Set families = this.htableDescriptor.getFamiliesKeys(); + for (byte[] family : families) { + if (!replicationScope.containsKey(family)) { + int scope = htd.getFamily(family).getScope(); + // Only store those families that has NON-DEFAULT scope + if (scope != REPLICATION_SCOPE_LOCAL) { + // Do a copy before storing it here. + replicationScope.put(Bytes.copy(family), scope); + } + } + } this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); @@ -970,7 +987,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId, getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc); + WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc, + mvcc); } private void writeRegionCloseMarker(WAL wal) throws IOException { @@ -978,7 +996,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc); + WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, + mvcc); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a @@ -2272,7 +2291,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); + WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, + mvcc); } // Prepare flush (take a snapshot) @@ -2321,7 +2341,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, + WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + @@ -2366,7 +2386,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(), -1, new TreeMap>(Bytes.BYTES_COMPARATOR)); try { - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, + mvcc); return true; } catch (IOException e) { LOG.warn(getRegionInfo().getEncodedName() + " : " @@ -2436,7 +2457,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // write flush marker to WAL. If fail, we should throw DroppedSnapshotException FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, + mvcc); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. @@ -2449,7 +2471,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); + WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "failed writing ABORT_FLUSH marker to WAL", ex); @@ -3125,13 +3147,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!replay) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, + this.getReplicationScope()); } // TODO: Use the doAppend methods below... complicated by the replay stuff above. try { - long txid = - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + long txid = this.wal.append(this.getRegionInfo(), walKey, + walEdit, true); if (txid != 0) sync(txid, durability); writeEntry = walKey.getWriteEntry(); } catch (IOException ioe) { @@ -3257,8 +3280,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), - currentNonceGroup, currentNonce, mvcc); - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + currentNonceGroup, currentNonce, mvcc, this.getReplicationScope()); + this.wal.append(this.getRegionInfo(), walKey, walEdit, true); // Complete the mvcc transaction started down in append else it will block others this.mvcc.complete(walKey.getWriteEntry()); } @@ -5369,7 +5392,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor( this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); - WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(), + WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { if (this.rsServices != null) { @@ -6298,6 +6321,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return r.openHRegion(reporter); } + @VisibleForTesting + public NavigableMap getReplicationScope() { + return this.replicationScope; + } /** * Useful when reopening a closed region (normally for unit tests) @@ -7047,10 +7074,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // here instead of WALKey directly to support legacy coprocessors. WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc); + nonceGroup, nonce, mvcc, this.getReplicationScope()); try { long txid = - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + this.wal.append(this.getRegionInfo(), walKey, walEdit, true); // Call sync on our edit. if (txid != 0) sync(txid, durability); writeEntry = walKey.getWriteEntry(); @@ -7340,7 +7367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -7363,7 +7390,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock MultiVersionConcurrencyControl.FIXED_SIZE // mvcc - + ClassSize.TREEMAP // maxSeqIdInStores + + 2 * ClassSize.TREEMAP // maxSeqIdInStores + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress ; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5cc3fc9..2ff583b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1307,7 +1307,7 @@ public class HStore implements Store { // Fix reaching into Region to get the maxWaitForSeqId. // Does this method belong in Region altogether given it is making so many references up there? // Could be Region#writeCompactionMarker(compactionDescriptor); - WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(), + WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 09da8fc..f3f869c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -1083,8 +1082,8 @@ public class FSHLog implements WAL { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override - public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final boolean inMemstore) throws IOException { + public long append(final HRegionInfo hri, + final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. @@ -1100,7 +1099,7 @@ public class FSHLog implements WAL { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); + entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); @@ -1878,14 +1877,12 @@ public class FSHLog implements WAL { entry.getEdit())) { if (entry.getEdit().isReplay()) { // Set replication scope null so that this won't be replicated - entry.getKey().setScopes(null); + entry.getKey().serializeReplicationScope(false); } } if (!listeners.isEmpty()) { for (WALActionsListener i: listeners) { - // TODO: Why does listener take a table description and CPs take a regioninfo? Fix. - i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(), - entry.getEdit()); + i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 86a3c3d..06318f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -26,7 +26,6 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; @@ -51,15 +50,13 @@ class FSWALEntry extends Entry { // they are only in memory and held here while passing over the ring buffer. private final transient long sequence; private final transient boolean inMemstore; - private final transient HTableDescriptor htd; private final transient HRegionInfo hri; private final Set familyNames; FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, - final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) { + final HRegionInfo hri, final boolean inMemstore) { super(key, edit); this.inMemstore = inMemstore; - this.htd = htd; this.hri = hri; this.sequence = sequence; if (inMemstore) { @@ -71,6 +68,7 @@ class FSWALEntry extends Entry { Set familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); for (Cell cell : cells) { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + // TODO: Avoid this clone? familySet.add(CellUtil.cloneFamily(cell)); } } @@ -89,10 +87,6 @@ class FSWALEntry extends Entry { return this.inMemstore; } - HTableDescriptor getHTableDescriptor() { - return this.htd; - } - HRegionInfo getHRegionInfo() { return this.hri; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 7c40323..d7bf4a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -24,6 +24,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.NavigableMap; import java.util.UUID; import org.apache.commons.logging.Log; @@ -67,7 +68,7 @@ public class HLogKey extends WALKey implements Writable { } public HLogKey(final byte[] encodedRegionName, final TableName tablename) { - super(encodedRegionName, tablename); + super(encodedRegionName, tablename, null); } @VisibleForTesting @@ -75,11 +76,15 @@ public class HLogKey extends WALKey implements Writable { super(encodedRegionName, tablename, now); } - public HLogKey(final byte[] encodedRegionName, - final TableName tablename, - final long now, - final MultiVersionConcurrencyControl mvcc) { - super(encodedRegionName, tablename, now, mvcc); + @VisibleForTesting + public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now, + final NavigableMap replicationScope) { + super(encodedRegionName, tablename, now, replicationScope); + } + + public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now, + final MultiVersionConcurrencyControl mvcc, final NavigableMap scopes) { + super(encodedRegionName, tablename, now, mvcc, scopes); } /** @@ -111,6 +116,35 @@ public class HLogKey extends WALKey implements Writable { * Create the log key for writing to somewhere. * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. + *

Used by log splitting and snapshots. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup the noncegroup + * @param nonce the nonce + * @param replicationScope the replicationScope of the non-default column families' of the region + */ + public HLogKey( + final byte[] encodedRegionName, + final TableName tablename, + long logSeqNum, + final long now, + List clusterIds, + long nonceGroup, + long nonce, + MultiVersionConcurrencyControl mvcc, NavigableMap replicationScope) { + super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, + replicationScope); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. * * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). @@ -192,7 +226,7 @@ public class HLogKey extends WALKey implements Writable { // encodes the length of encodedRegionName. // If < 0 we just read the version and the next vint is the length. // @see Bytes#readByteArray(DataInput) - setScopes(null); // writable HLogKey does not contain scopes + serializeReplicationScope(false); // writable HLogKey does not contain scopes int len = WritableUtils.readVInt(in); byte[] tablenameBytes = null; if (len < 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index db98083..a6452e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.wal.WALKey; @@ -85,7 +84,6 @@ public interface WALActionsListener { ); /** - * @param htd * @param logKey * @param logEdit TODO: Retire this in favor of * {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get @@ -93,8 +91,7 @@ public interface WALActionsListener { * htd. * @throws IOException If failed to parse the WALEdit */ - void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) - throws IOException; + void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException; /** * For notification post append to the writer. Used by metrics system at least. @@ -135,8 +132,7 @@ public interface WALActionsListener { public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {} @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) - throws IOException { + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index f268422..197144d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -58,10 +58,11 @@ public class WALUtil { *

This write is for internal use only. Not for external client consumption. * @param mvcc Used by WAL to get sequence Id for the waledit. */ - public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, - final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc) + public static WALKey writeCompactionMarker(WAL wal, + NavigableMap replicationScope, HRegionInfo hri, final CompactionDescriptor c, + MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc); + WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -73,11 +74,11 @@ public class WALUtil { * *

This write is for internal use only. Not for external client consumption. */ - public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, - final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKey walKey = - doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync); + public static WALKey writeFlushMarker(WAL wal, NavigableMap replicationScope, + HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri, + WALEdit.createFlushWALEdit(hri, f), mvcc, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } @@ -88,10 +89,12 @@ public class WALUtil { * Write a region open marker indicating that the region is opened. * This write is for internal use only. Not for external client consumption. */ - public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, + public static WALKey writeRegionEventMarker(WAL wal, + NavigableMap replicationScope, HRegionInfo hri, final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc); + WALKey walKey = writeMarker(wal, replicationScope, hri, + WALEdit.createRegionEventWALEdit(hri, r), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -102,28 +105,30 @@ public class WALUtil { * Write a log marker that a bulk load has succeeded and is about to be committed. * This write is for internal use only. Not for external client consumption. * @param wal The log to write into. - * @param htd A description of the table that we are bulk loading into. + * @param replicationScope The replication scope of the families in the HRegion * @param hri A description of the region in the table that we are bulk loading into. * @param desc A protocol buffers based description of the client's bulk loading request * @return walKey with sequenceid filled out for this bulk load marker * @throws IOException We will throw an IOException if we can not append to the HLog. */ - public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd, - final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc, - final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc); + public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, + final NavigableMap replicationScope, final HRegionInfo hri, + final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), + mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } return walKey; } - private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd, - final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc) + private static WALKey writeMarker(final WAL wal, + final NavigableMap replicationScope, final HRegionInfo hri, + final WALEdit edit, final MultiVersionConcurrencyControl mvcc) throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true); + return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true); } /** @@ -134,16 +139,16 @@ public class WALUtil { *

This write is for internal use only. Not for external client consumption. * @return WALKey that was added to the WAL. */ - public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd, - final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, - final boolean sync) + public static WALKey doFullAppendTransaction(final WAL wal, + final NavigableMap replicationScope, final HRegionInfo hri, + final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync) throws IOException { // TODO: Pass in current time to use? - WALKey walKey = - new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc); + WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), + System.currentTimeMillis(), mvcc, replicationScope); long trx = MultiVersionConcurrencyControl.NONE; try { - trx = wal.append(htd, hri, walKey, edit, false); + trx = wal.append(hri, walKey, edit, false); if (sync) { wal.sync(trx); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index f97ec15..28a83dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -44,7 +44,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { - NavigableMap scopes = entry.getKey().getScopes(); + NavigableMap scopes = entry.getKey().getReplicationScopes(); if (scopes == null || scopes.isEmpty()) { return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index a5d2446..bb4a5a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; -import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; @@ -61,7 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -257,72 +252,47 @@ public class Replication extends WALActionsListener.Base implements } @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) - throws IOException { - scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager()); + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); } /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. - * @param htd Descriptor used to find the scope to use * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes * @param replicationManager Manager used to add bulk load events hfile references * @throws IOException If failed to parse the WALEdit */ - public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit, - Configuration conf, ReplicationSourceManager replicationManager) throws IOException { - NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - byte[] family; + public static void scopeWALEdits(WALKey logKey, + WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) + throws IOException { boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); + byte[] family; + boolean foundOtherEdits = false; for (Cell cell : logEdit.getCells()) { if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell); + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + for (StoreDescriptor s : bld.getStoresList()) { + family = s.getFamilyName().toByteArray(); + addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s); + } + } catch (IOException e) { + LOG.error("Failed to get bulk load events information from the wal file.", e); + throw e; + } } else { // Skip the flush/compaction/region events continue; } } else { - family = CellUtil.cloneFamily(cell); - // Unexpected, has a tendency to happen in unit tests - assert htd.getFamily(family) != null; - - if (!scopes.containsKey(family)) { - int scope = htd.getFamily(family).getScope(); - if (scope != REPLICATION_SCOPE_LOCAL) { - scopes.put(family, scope); - } - } + foundOtherEdits = true; } } - if (!scopes.isEmpty() && !logEdit.isReplay()) { - logKey.setScopes(scopes); - } - } - - private static void scopeBulkLoadEdits(HTableDescriptor htd, - ReplicationSourceManager replicationManager, NavigableMap scopes, - TableName tableName, Cell cell) throws IOException { - byte[] family; - try { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - for (StoreDescriptor s : bld.getStoresList()) { - family = s.getFamilyName().toByteArray(); - if (!scopes.containsKey(family)) { - int scope = htd.getFamily(family).getScope(); - if (scope != REPLICATION_SCOPE_LOCAL) { - scopes.put(family, scope); - addHFileRefsToQueue(replicationManager, tableName, family, s); - } - } else { - addHFileRefsToQueue(replicationManager, tableName, family, s); - } - } - } catch (IOException e) { - LOG.error("Failed to get bulk load events information from the wal file.", e); - throw e; + if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { + logKey.serializeReplicationScope(false); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 0c41e77..c3d4b2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.util.FSUtils; // imports for things that haven't moved from regionserver.wal yet. @@ -154,8 +153,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - boolean inMemstore) { + public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index d2b336e..0b83528 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -25,7 +25,6 @@ import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; // imports we use from yet-to-be-moved regionsever.wal @@ -106,21 +105,17 @@ public interface WAL { * completes BUT on return this edit must have its region edit/sequence id assigned * else it messes up our unification of mvcc and sequenceid. On return key will * have the region edit/sequence id filled in. - * @param info + * @param info the regioninfo associated with append * @param key Modified by this call; we add to it this edits region edit/sequence id. * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit * sequence id that is after all currently appended edits. - * @param htd used to give scope for replication TODO refactor out in favor of table name and - * info * @param inMemstore Always true except for case where we are writing a compaction completion * record into the WAL; in this case the entry is just so we can finish an unfinished compaction * -- it is not an edit for memstore. * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. */ - long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - boolean inMemstore) - throws IOException; + long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException; /** * Sync what we have in the WAL. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 09096fe..b60184f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -193,7 +193,7 @@ public class WALKey implements SequenceId, Comparable { @InterfaceAudience.Private protected List clusterIds; - private NavigableMap scopes; + private NavigableMap replicationScope; private long nonceGroup = HConstants.NO_NONCE; private long nonce = HConstants.NO_NONCE; @@ -210,7 +210,12 @@ public class WALKey implements SequenceId, Comparable { public WALKey() { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE, null); + new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); + } + + public WALKey(final NavigableMap replicationScope) { + init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); } @VisibleForTesting @@ -220,15 +225,16 @@ public class WALKey implements SequenceId, Comparable { List clusterIds = new ArrayList(); clusterIds.add(clusterId); init(encodedRegionName, tablename, logSeqNum, now, clusterIds, - HConstants.NO_NONCE, HConstants.NO_NONCE, null); + HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); } /** * @deprecated Remove. Useless. */ @Deprecated // REMOVE - public WALKey(final byte[] encodedRegionName, final TableName tablename) { - this(encodedRegionName, tablename, System.currentTimeMillis()); + public WALKey(final byte[] encodedRegionName, final TableName tablename, + final NavigableMap replicationScope) { + this(encodedRegionName, tablename, System.currentTimeMillis(), replicationScope); } // TODO: Fix being able to pass in sequenceid. @@ -240,7 +246,20 @@ public class WALKey implements SequenceId, Comparable { EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, - null); + null, null); + } + + // TODO: Fix being able to pass in sequenceid. + public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now, + final NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, null, replicationScope); + } + + public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now, + MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, mvcc, replicationScope); } public WALKey(final byte[] encodedRegionName, @@ -254,7 +273,33 @@ public class WALKey implements SequenceId, Comparable { EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, - mvcc); + mvcc, null); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + *

Used by log splitting and snapshots. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup the nonceGroup + * @param nonce the nonce + * @param mvcc the mvcc associate the WALKey + * @param replicationScope the non-default replication scope + * associated with the region's column families + */ + // TODO: Fix being able to pass in sequenceid. + public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + final long now, List clusterIds, long nonceGroup, long nonce, + MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, + replicationScope); } /** @@ -279,7 +324,7 @@ public class WALKey implements SequenceId, Comparable { long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); } /** @@ -299,7 +344,31 @@ public class WALKey implements SequenceId, Comparable { public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now, List clusterIds, long nonceGroup, final long nonce, final MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc); + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, + null); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup the nonceGroup + * @param nonce the nonce + * @param mvcc mvcc control used to generate sequence numbers and control read/write points + * @param replicationScope the non-default replication scope of the column families + */ + public WALKey(final byte[] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, + final long nonce, final MultiVersionConcurrencyControl mvcc, + NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, + replicationScope); } /** @@ -328,7 +397,7 @@ public class WALKey implements SequenceId, Comparable { EMPTY_UUIDS, nonceGroup, nonce, - mvcc); + mvcc, null); } @InterfaceAudience.Private @@ -339,7 +408,8 @@ public class WALKey implements SequenceId, Comparable { List clusterIds, long nonceGroup, long nonce, - MultiVersionConcurrencyControl mvcc) { + MultiVersionConcurrencyControl mvcc, + NavigableMap replicationScope) { this.sequenceId = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; @@ -351,6 +421,7 @@ public class WALKey implements SequenceId, Comparable { if (logSeqNum != NO_SEQUENCE_ID) { setSequenceId(logSeqNum); } + this.replicationScope = replicationScope; } // For HLogKey and deserialization. DO NOT USE. See setWriteEntry below. @@ -418,8 +489,8 @@ public class WALKey implements SequenceId, Comparable { return this.writeTime; } - public NavigableMap getScopes() { - return scopes; + public NavigableMap getReplicationScopes() { + return replicationScope; } /** @return The nonce group */ @@ -432,8 +503,14 @@ public class WALKey implements SequenceId, Comparable { return nonce; } - public void setScopes(NavigableMap scopes) { - this.scopes = scopes; + private void setReplicationScope(NavigableMap replicationScope) { + this.replicationScope = replicationScope; + } + + public void serializeReplicationScope(boolean serialize) { + if (!serialize) { + setReplicationScope(null); + } } public void readOlderScopes(NavigableMap scopes) { @@ -450,7 +527,7 @@ public class WALKey implements SequenceId, Comparable { } } if (scopes.size() > 0) { - this.scopes = scopes; + this.replicationScope = scopes; } } } @@ -598,8 +675,8 @@ public class WALKey implements SequenceId, Comparable { uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); builder.addClusterIds(uuidBuilder.build()); } - if (scopes != null) { - for (Map.Entry e : scopes.entrySet()) { + if (replicationScope != null) { + for (Map.Entry e : replicationScope.entrySet()) { ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey()) : compressor.compress(e.getKey(), compressionContext.familyDict); builder.addScopes(FamilyScope.newBuilder() @@ -638,13 +715,13 @@ public class WALKey implements SequenceId, Comparable { if (walKey.hasNonce()) { this.nonce = walKey.getNonce(); } - this.scopes = null; + this.replicationScope = null; if (walKey.getScopesCount() > 0) { - this.scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + this.replicationScope = new TreeMap(Bytes.BYTES_COMPARATOR); for (FamilyScope scope : walKey.getScopesList()) { byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); - this.scopes.put(family, scope.getScopeType().getNumber()); + this.replicationScope.put(family, scope.getScopeType().getNumber()); } } setSequenceId(walKey.getLogSequenceNumber()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 54b82b2..9b152d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1777,7 +1777,7 @@ public class WALSplitter { WALEdit edit = entry.getEdit(); TableName table = entry.getKey().getTablename(); // clear scopes which isn't needed for recovery - entry.getKey().setScopes(null); + entry.getKey().serializeReplicationScope(false); String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); // skip edits of non-existent tables if (nonExistentTables != null && nonExistentTables.contains(table)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 35a7403..d92b2f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -279,7 +279,8 @@ public class TestIOFencing { CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), new Path("store_dir")); - WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(), + WALUtil.writeCompactionMarker(compactingRegion.getWAL(), + ((HRegion)compactingRegion).getReplicationScope(), oldHri, compactionDescriptor, compactingRegion.getMVCC()); // Wait till flush has happened, otherwise there won't be multiple store files diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 75fe7a2..03760d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -29,6 +29,8 @@ import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -184,7 +186,11 @@ public class TestWALObserver { HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); final HTableDescriptor htd = createBasic3FamilyHTD(Bytes .toString(TEST_TABLE)); - + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); deleteDir(basedir); fs.mkdirs(new Path(basedir, hri.getEncodedName())); @@ -235,8 +241,8 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), - edit, true); + long txid = log.append(hri, + new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true); log.sync(txid); // the edit shall have been change now by the coprocessor. @@ -296,10 +302,15 @@ public class TestWALObserver { assertFalse(oldApi.isPostWALWriteDeprecatedCalled()); LOG.debug("writing to WAL with non-legacy keys."); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (HColumnDescriptor hcd : htd.getFamilies()) { + scopes.put(hcd.getName(), 0); + } final int countPerFamily = 5; for (HColumnDescriptor hcd : htd.getFamilies()) { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc); + EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); } LOG.debug("Verify that only the non-legacy CP saw edits."); @@ -323,7 +334,7 @@ public class TestWALObserver { final WALEdit edit = new WALEdit(); final byte[] nonce = Bytes.toBytes("1772"); edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce)); - final long txid = wal.append(htd, hri, legacyKey, edit, true); + final long txid = wal.append(hri, legacyKey, edit, true); wal.sync(txid); LOG.debug("Make sure legacy cps can see supported edits after having been skipped."); @@ -349,7 +360,11 @@ public class TestWALObserver { final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } WAL log = wals.getWAL(UNSPECIFIED_REGION, null); try { SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class); @@ -360,8 +375,8 @@ public class TestWALObserver { assertFalse(cp.isPostWALWriteCalled()); final long now = EnvironmentEdgeManager.currentTime(); - long txid = log.append(htd, hri, - new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc), + long txid = log.append(hri, + new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), new WALEdit(), true); log.sync(txid); @@ -400,14 +415,18 @@ public class TestWALObserver { // Put p = creatPutWith2Families(TEST_ROW); WALEdit edit = new WALEdit(); long now = EnvironmentEdgeManager.currentTime(); - // addFamilyMapToWALEdit(p.getFamilyMap(), edit); final int countPerFamily = 1000; - // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (HColumnDescriptor hcd : htd.getFamilies()) { + scopes.put(hcd.getName(), 0); + } for (HColumnDescriptor hcd : htd.getFamilies()) { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc); + EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); } - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true); + wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + true); // sync to fs. wal.sync(); @@ -527,7 +546,8 @@ public class TestWALObserver { private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException { + final NavigableMap scopes, final MultiVersionConcurrencyControl mvcc) + throws IOException { String familyStr = Bytes.toString(family); long txid = -1; for (int j = 0; j < count; j++) { @@ -537,7 +557,7 @@ public class TestWALObserver { edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors - txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); } if (-1 != txid) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index 5fa588b..752faa6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; + +import java.util.NavigableMap; + import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; @@ -32,8 +35,8 @@ import org.junit.experimental.categories.Category; public class TestHLogRecordReader extends TestWALRecordReader { @Override - protected WALKey getWalKey(final long time) { - return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc); + protected WALKey getWalKey(final long time, NavigableMap scopes) { + return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 05f9f36..094fe1c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -34,6 +34,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -656,9 +657,9 @@ public class TestImportExport { Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); // Register the wal listener for the import table - TableWALActionListener walListener = new TableWALActionListener(importTableName); HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + TableWALActionListener walListener = new TableWALActionListener(region); WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); wal.registerWALActionsListener(walListener); @@ -678,7 +679,7 @@ public class TestImportExport { region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); - walListener = new TableWALActionListener(importTableName); + walListener = new TableWALActionListener(region); wal.registerWALActionsListener(walListener); args = new String[] { importTableName, FQ_OUTPUT_DIR }; assertTrue(runImport(args)); @@ -695,16 +696,17 @@ public class TestImportExport { */ private static class TableWALActionListener extends WALActionsListener.Base { - private String tableName; + private HRegionInfo regionInfo; private boolean isVisited = false; - public TableWALActionListener(String tableName) { - this.tableName = tableName; + public TableWALActionListener(HRegionInfo region) { + this.regionInfo = region; } @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { - if (tableName.equalsIgnoreCase(htd.getNameAsString()) && (!logEdit.isMetaEdit())) { + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { + if (logKey.getTablename().getNameAsString().equalsIgnoreCase( + this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) { isVisited = true; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index a4381c8..aee2a06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -77,6 +79,8 @@ public class TestWALRecordReader { private static HTableDescriptor htd; private static Path logDir; protected MultiVersionConcurrencyControl mvcc; + protected static NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); private static String getName() { return "TestWALRecordReader"; @@ -128,10 +132,10 @@ public class TestWALRecordReader { long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(htd, info, getWalKey(ts), edit, true); + log.append(info, getWalKey(ts, scopes), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(htd, info, getWalKey(ts+1), edit, true); + log.append(info, getWalKey(ts+1, scopes), edit, true); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -142,10 +146,10 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(htd, info, getWalKey(ts1+1), edit, true); + log.append(info, getWalKey(ts1+1, scopes), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(htd, info, getWalKey(ts1+2), edit, true); + log.append(info, getWalKey(ts1+2, scopes), edit, true); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -187,7 +191,7 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true); + long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -197,7 +201,7 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true); + txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); log.sync(txid); log.shutdown(); walfactory.shutdown(); @@ -236,8 +240,8 @@ public class TestWALRecordReader { testSplit(splits.get(1)); } - protected WALKey getWalKey(final long time) { - return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc); + protected WALKey getWalKey(final long time, NavigableMap scopes) { + return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); } protected WALRecordReader getReader() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 3a7aff0..6b0302f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -87,6 +87,7 @@ public class TestBulkLoad { private final byte[] randomBytes = new byte[100]; private final byte[] family1 = Bytes.toBytes("family1"); private final byte[] family2 = Bytes.toBytes("family2"); + @Rule public TestName name = new TestName(); @@ -105,7 +106,7 @@ public class TestBulkLoad { storeFileName = (new Path(storeFileName)).getName(); List storeFileNames = new ArrayList(); storeFileNames.add(storeFileName); - when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class), + when(log.append(any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)), any(boolean.class))).thenAnswer(new Answer() { @@ -132,7 +133,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadSingleFamilyHLog() throws IOException { - when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), + when(log.append(any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -151,7 +152,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadManyFamilyHLog() throws IOException { - when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), + when(log.append(any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -171,7 +172,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { - when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), + when(log.append(any(HRegionInfo.class), any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(boolean.class))).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a5574d3..ba2f0b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -896,7 +896,7 @@ public class TestHRegion { storeFiles, Lists.newArrayList(newFile), region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); - WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(), + WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); @@ -4796,7 +4796,7 @@ public class TestHRegion { //verify append called or not verify(wal, expectAppend ? times(1) : never()) - .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), + .append((HRegionInfo)any(), (WALKey)any(), (WALEdit)any(), Mockito.anyBoolean()); // verify sync called or not @@ -5998,7 +5998,7 @@ public class TestHRegion { region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null); - verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() + verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any() , editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); @@ -6111,7 +6111,7 @@ public class TestHRegion { // verify that we have not appended region open event to WAL because this region is still // recovering - verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() + verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any() , editCaptor.capture(), anyBoolean()); // not put the region out of recovering state @@ -6119,7 +6119,7 @@ public class TestHRegion { .prepare().process(); // now we should have put the entry - verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() + verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any() , editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); @@ -6163,7 +6163,7 @@ public class TestHRegion { */ private WAL mockWAL() throws IOException { WAL wal = mock(WAL.class); - Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(), + Mockito.when(wal.append((HRegionInfo)Mockito.any(), (WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). thenAnswer(new Answer() { @Override @@ -6206,7 +6206,7 @@ public class TestHRegion { region.close(false); // 2 times, one for region open, the other close region - verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), + verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(), editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getAllValues().get(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 4d5d7d8..9183e18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1126,7 +1126,7 @@ public class TestHRegionReplayEvents { // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); - verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + verify(walSecondary, times(0)).append((HRegionInfo)any(), (WALKey)any(), (WALEdit)any(), anyBoolean()); // test for replay prepare flush @@ -1140,11 +1140,11 @@ public class TestHRegionReplayEvents { .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName())) .build()); - verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + verify(walSecondary, times(0)).append((HRegionInfo)any(), (WALKey)any(), (WALEdit)any(), anyBoolean()); secondaryRegion.close(); - verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + verify(walSecondary, times(0)).append((HRegionInfo)any(), (WALKey)any(), (WALEdit)any(), anyBoolean()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 87cbab7..76b4134 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -411,7 +412,7 @@ public class TestHRegionServerBulkLoad { private boolean found = false; @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { for (Cell cell : logEdit.getCells()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); for (Map.Entry entry : kv.toStringMap().entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 567e09d..e9bb468 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -208,13 +210,17 @@ public class TestWALLockup { HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); final HRegion region = initHRegion(tableName, null, null, dodgyWAL); byte [] bytes = Bytes.toBytes(getName()); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + scopes.put(COLUMN_FAMILY_BYTES, 0); try { // First get something into memstore. Make a Put and then pull the Cell out of it. Will // manage append and sync carefully in below to manufacture hang. We keep adding same // edit. WAL subsystem doesn't care. Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName()); + WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(), + scopes); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -228,7 +234,7 @@ public class TestWALLockup { LOG.info("SET throwing of exception on append"); dodgyWAL.throwException = true; // This append provokes a WAL roll request - dodgyWAL.append(htd, region.getRegionInfo(), key, edit, true); + dodgyWAL.append(region.getRegionInfo(), key, edit, true); boolean exception = false; try { dodgyWAL.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index fd6d535..c60b225 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -28,7 +28,9 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import org.apache.commons.lang.mutable.MutableBoolean; @@ -152,12 +154,9 @@ public class TestFSHLog { } } - protected void addEdits(WAL log, - HRegionInfo hri, - HTableDescriptor htd, - int times, - MultiVersionConcurrencyControl mvcc) - throws IOException { + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, + MultiVersionConcurrencyControl mvcc, NavigableMap scopes) + throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); @@ -165,8 +164,8 @@ public class TestFSHLog { cols.add(new KeyValue(row, row, row, timestamp, row)); WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, - HConstants.NO_NONCE, mvcc); - log.append(htd, hri, key, cols, true); + HConstants.NO_NONCE, mvcc, scopes); + log.append(hri, key, cols, true); } log.sync(); } @@ -261,11 +260,21 @@ public class TestFSHLog { new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); // add edits and roll the wal MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + NavigableMap scopes1 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : t1.getFamiliesKeys()) { + scopes1.put(fam, 0); + } + NavigableMap scopes2 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : t2.getFamiliesKeys()) { + scopes2.put(fam, 0); + } try { - addEdits(wal, hri1, t1, 2, mvcc); + addEdits(wal, hri1, t1, 2, mvcc, scopes1); wal.rollWriter(); // add some more edits and roll the wal. This would reach the log number threshold - addEdits(wal, hri1, t1, 2, mvcc); + addEdits(wal, hri1, t1, 2, mvcc, scopes1); wal.rollWriter(); // with above rollWriter call, the max logs limit is reached. assertTrue(wal.getNumRolledLogFiles() == 2); @@ -276,7 +285,7 @@ public class TestFSHLog { assertEquals(1, regionsToFlush.length); assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // insert edits in second region - addEdits(wal, hri2, t2, 2, mvcc); + addEdits(wal, hri2, t2, 2, mvcc, scopes2); // get the regions to flush, it should still read region1. regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(regionsToFlush.length, 1); @@ -293,12 +302,12 @@ public class TestFSHLog { // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. - addEdits(wal, hri1, t1, 2, mvcc); - addEdits(wal, hri2, t2, 2, mvcc); + addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri2, t2, 2, mvcc, scopes2); wal.rollWriter(); // add edits and roll the writer, to reach the max logs limit. assertEquals(1, wal.getNumRolledLogFiles()); - addEdits(wal, hri1, t1, 2, mvcc); + addEdits(wal, hri1, t1, 2, mvcc, scopes1); wal.rollWriter(); // it should return two regions to flush, as the oldest wal file has entries // for both regions. @@ -310,7 +319,7 @@ public class TestFSHLog { wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. - addEdits(wal, hri1, t1, 2, mvcc); + addEdits(wal, hri1, t1, 2, mvcc, scopes1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); @@ -360,6 +369,11 @@ public class TestFSHLog { HBaseTestingUtility.closeRegionAndWAL(r); final int countPerFamily = 10; final MutableBoolean goslow = new MutableBoolean(false); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } // subclass and doctor a method. FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(), testName, conf) { @@ -403,9 +417,9 @@ public class TestFSHLog { for (int i = 0; i < countPerFamily; i++) { final HRegionInfo info = region.getRegionInfo(); final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC()); - wal.append(htd, info, logkey, edits, true); - } + System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); + wal.append(info, logkey, edits, true); + } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. long currentSequenceId = region.getReadPoint(null); @@ -439,11 +453,16 @@ public class TestFSHLog { syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < 10; i++) { - addEdits(log, hri, htd, 1, mvcc); + addEdits(log, hri, htd, 1, mvcc, scopes); } } finally { log.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 9dccffe..c05e7f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; + import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -199,8 +202,13 @@ public class TestLogRollAbort { kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); - log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), kvs, true); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + log.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 0c68fc1..9ab7b7d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertFalse; import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -139,8 +141,13 @@ public class TestLogRollingNoCluster { edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO; final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor(); - final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, now, mvcc), edit, true); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), + TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); wal.sync(txid); } String msg = getName() + " finished"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index a2c387b..b6bb7a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -98,9 +100,13 @@ public class TestWALActionsListener { edit.add(kv); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(SOME_BYTES)); htd.addFamily(new HColumnDescriptor(b)); - - final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.valueOf(b), 0), edit, true); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), + TableName.valueOf(b), 0, scopes), edit, true); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index dbc06ff..3e894d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -37,7 +37,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -308,9 +310,14 @@ public class TestWALReplay { // Add 1k to each family. final int countPerFamily = 1000; + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, - wal1, htd, mvcc); + wal1, htd, mvcc, scopes); } wal1.shutdown(); runWALSplit(this.conf); @@ -319,7 +326,7 @@ public class TestWALReplay { // Add 1k to each family. for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal2, htd, mvcc); + ee, wal2, htd, mvcc, scopes); } wal2.shutdown(); runWALSplit(this.conf); @@ -800,9 +807,14 @@ public class TestWALReplay { // Add 1k to each family. final int countPerFamily = 1000; Set familyNames = new HashSet(); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal, htd, mvcc); + ee, wal, htd, mvcc, scopes); familyNames.add(hcd.getName()); } @@ -815,13 +827,15 @@ public class TestWALReplay { long now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true); + wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + true); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true); + wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + true); // Sync. wal.sync(); @@ -1046,12 +1060,16 @@ public class TestWALReplay { deleteDir(basedir); final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); HBaseTestingUtility.closeRegionAndWAL(region); final byte[] family = htd.getColumnFamilies()[0].getName(); final byte[] rowName = tableName.getName(); - FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1); - FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2); + FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); + FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); Path largeFile = new Path(logDir, "wal-1"); Path smallFile = new Path(logDir, "wal-2"); @@ -1154,8 +1172,8 @@ public class TestWALReplay { } private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, - final MultiVersionConcurrencyControl mvcc) { - return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc); + final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) { + return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); } private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, @@ -1169,19 +1187,20 @@ public class TestWALReplay { private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, - int index) throws IOException { + int index, NavigableMap scopes) throws IOException { FSWALEntry entry = - new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit( - rowName, family, ee, index), htd, hri, true); + new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( + rowName, family, ee, index), hri, true); entry.stampRegionSequenceId(); return entry; } private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException { + final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, + NavigableMap scopes) throws IOException { for (int j = 0; j < count; j++) { - wal.append(htd, hri, createWALKey(tableName, hri, mvcc), + wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), createWALEdit(rowName, family, ee, j), true); } wal.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index e52a600..a50bbc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.NavigableMap; +import java.util.TreeMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +66,7 @@ public class TestReplicationBase { protected static Table htable1; protected static Table htable2; + protected static NavigableMap scopes; protected static HBaseTestingUtility utility1; protected static HBaseTestingUtility utility2; @@ -140,6 +144,11 @@ public class TestReplicationBase { table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); + scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(HColumnDescriptor f : table.getColumnFamilies()) { + scopes.put(f.getName(), f.getScope()); + } Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); try (Admin admin1 = connection1.getAdmin()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index ab97238..97ccd33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -26,6 +26,8 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -658,7 +660,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { HRegionInfo hri = new HRegionInfo(htable1.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit, + Replication.scopeWALEdits(new WALKey(), edit, htable1.getConfiguration(), null); } @@ -767,7 +769,10 @@ public class TestReplicationSmallTests extends TestReplicationBase { HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0); HRegionInfo hri = region.getRegionInfo(); - + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) { + scopes.put(fam, 1); + } final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); @@ -778,8 +783,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { long now = EnvironmentEdgeManager.currentTime(); edit.add(new KeyValue(rowName, famName, qualifier, now, value)); - WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc); - wal.append(htable1.getTableDescriptor(), hri, walKey, edit, true); + WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); + wal.append(hri, walKey, edit, true); wal.sync(); Get get = new Get(rowName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 22c421d..f40c7d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -58,19 +58,19 @@ public class TestReplicationWALEntryFilters { // meta WALKey key1 = new WALKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME); + TableName.META_TABLE_NAME, null); Entry metaEntry = new Entry(key1, null); assertNull(filter.filter(metaEntry)); // ns table - WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME); + WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME, null); Entry nsEntry = new Entry(key2, null); assertNull(filter.filter(nsEntry)); // user table - WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo")); + WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo"), null); Entry userEntry = new Entry(key3, null); assertEquals(userEntry, filter.filter(userEntry)); @@ -80,33 +80,30 @@ public class TestReplicationWALEntryFilters { public void testScopeWALEntryFilter() { ScopeWALEntryFilter filter = new ScopeWALEntryFilter(); - Entry userEntry = createEntry(a, b); - Entry userEntryA = createEntry(a); - Entry userEntryB = createEntry(b); - Entry userEntryEmpty = createEntry(); + Entry userEntry = createEntry(null, a, b); + Entry userEntryA = createEntry(null, a); + Entry userEntryB = createEntry(null, b); + Entry userEntryEmpty = createEntry(null); // no scopes assertEquals(null, filter.filter(userEntry)); // empty scopes TreeMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); assertEquals(null, filter.filter(userEntry)); // different scope scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); // all kvs should be filtered assertEquals(userEntryEmpty, filter.filter(userEntry)); // local scope scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); assertEquals(userEntryEmpty, filter.filter(userEntry)); scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); assertEquals(userEntryEmpty, filter.filter(userEntry)); @@ -114,8 +111,7 @@ public class TestReplicationWALEntryFilters { // only scope a scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); assertEquals(userEntryA, filter.filter(userEntry)); scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); assertEquals(userEntryA, filter.filter(userEntry)); @@ -123,8 +119,7 @@ public class TestReplicationWALEntryFilters { // only scope b scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); assertEquals(userEntryB, filter.filter(userEntry)); scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); assertEquals(userEntryB, filter.filter(userEntry)); @@ -132,8 +127,7 @@ public class TestReplicationWALEntryFilters { // scope a and b scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); - userEntry = createEntry(a, b); - userEntry.getKey().setScopes(scopes); + userEntry = createEntry(scopes, a, b); assertEquals(userEntryB, filter.filter(userEntry)); scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); assertEquals(userEntryB, filter.filter(userEntry)); @@ -155,16 +149,16 @@ public class TestReplicationWALEntryFilters { @Test public void testChainWALEntryFilter() { - Entry userEntry = createEntry(a, b, c); + Entry userEntry = createEntry(null, a, b, c); ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter); - assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); filter = new ChainWALEntryFilter(passFilter, passFilter); - assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter); - assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); filter = new ChainWALEntryFilter(nullFilter); assertEquals(null, filter.filter(userEntry)); @@ -189,7 +183,7 @@ public class TestReplicationWALEntryFilters { new ChainWALEntryFilter(passFilter), new ChainWALEntryFilter(passFilter)), new ChainWALEntryFilter(passFilter)); - assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); filter = @@ -206,19 +200,19 @@ public class TestReplicationWALEntryFilters { ReplicationPeer peer = mock(ReplicationPeer.class); when(peer.getTableCFs()).thenReturn(null); - Entry userEntry = createEntry(a, b, c); + Entry userEntry = createEntry(null, a, b, c); TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer); - assertEquals(createEntry(a,b,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); // empty map - userEntry = createEntry(a, b, c); + userEntry = createEntry(null, a, b, c); Map> tableCfs = new HashMap>(); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); assertEquals(null, filter.filter(userEntry)); // table bar - userEntry = createEntry(a, b, c); + userEntry = createEntry(null, a, b, c); tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); @@ -226,24 +220,24 @@ public class TestReplicationWALEntryFilters { assertEquals(null, filter.filter(userEntry)); // table foo:a - userEntry = createEntry(a, b, c); + userEntry = createEntry(null, a, b, c); tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); - assertEquals(createEntry(a), filter.filter(userEntry)); + assertEquals(createEntry(null, a), filter.filter(userEntry)); // table foo:a,c - userEntry = createEntry(a, b, c, d); + userEntry = createEntry(null, a, b, c, d); tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); - assertEquals(createEntry(a,c), filter.filter(userEntry)); + assertEquals(createEntry(null, a,c), filter.filter(userEntry)); } - private Entry createEntry(byte[]... kvs) { - WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo")); + private Entry createEntry(TreeMap scopes, byte[]... kvs) { + WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"), null); WALEdit edit1 = new WALEdit(); for (byte[] kv : kvs) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index f042a8d..fb8cfa0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -131,6 +132,7 @@ public class TestReplicationSourceManager { private static CountDownLatch latch; private static List files = new ArrayList(); + private static NavigableMap scopes; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -177,6 +179,11 @@ public class TestReplicationSourceManager { col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); htd.addFamily(col); + scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } hri = new HRegionInfo(htd.getTableName(), r1, r2); } @@ -214,15 +221,20 @@ public class TestReplicationSourceManager { manager.init(); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); htd.addFamily(new HColumnDescriptor(f1)); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } // Testing normal log rolling every 20 for(long i = 1; i < 101; i++) { if(i > 1 && i % 20 == 0) { wal.rollWriter(); } LOG.info(i); - final long txid = wal.append(htd, + final long txid = wal.append( hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), + new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit, true); wal.sync(txid); @@ -236,8 +248,8 @@ public class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - wal.append(htd, hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), + wal.append(hri, + new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit, true); } @@ -254,8 +266,8 @@ public class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false, false); - wal.append(htd, hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), + wal.append(hri, + new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit, true); wal.sync(); @@ -427,33 +439,35 @@ public class TestReplicationSourceManager { @Test public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { - // 1. Create wal key - WALKey logKey = new WALKey(); - // 2. Get the bulk load wal edit event - WALEdit logEdit = getBulkLoadWALEdit(); + NavigableMap scope = new TreeMap(Bytes.BYTES_COMPARATOR); + // 1. Get the bulk load wal edit event + WALEdit logEdit = getBulkLoadWALEdit(scope); + // 2. Create wal key + WALKey logKey = new WALKey(scope); // 3. Get the scopes for the key - Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager); + Replication.scopeWALEdits(logKey, logEdit, conf, manager); // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled - assertNull("No bulk load entries scope should be added if bulk load replication is diabled.", - logKey.getScopes()); + assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", + logKey.getReplicationScopes()); } @Test public void testBulkLoadWALEdits() throws Exception { - // 1. Create wal key - WALKey logKey = new WALKey(); - // 2. Get the bulk load wal edit event - WALEdit logEdit = getBulkLoadWALEdit(); + // 1. Get the bulk load wal edit event + NavigableMap scope = new TreeMap(Bytes.BYTES_COMPARATOR); + WALEdit logEdit = getBulkLoadWALEdit(scope); + // 2. Create wal key + WALKey logKey = new WALKey(scope); // 3. Enable bulk load hfile replication Configuration bulkLoadConf = HBaseConfiguration.create(conf); bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); // 4. Get the scopes for the key - Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager); + Replication.scopeWALEdits(logKey, logEdit, bulkLoadConf, manager); - NavigableMap scopes = logKey.getScopes(); + NavigableMap scopes = logKey.getReplicationScopes(); // Assert family with replication scope global is present in the key scopes assertTrue("This family scope is set to global, should be part of replication key scopes.", scopes.containsKey(f1)); @@ -462,17 +476,16 @@ public class TestReplicationSourceManager { scopes.containsKey(f2)); } - private WALEdit getBulkLoadWALEdit() { + private WALEdit getBulkLoadWALEdit(NavigableMap scope) { // 1. Create store files for the families Map> storeFiles = new HashMap<>(1); List p = new ArrayList<>(1); p.add(new Path(Bytes.toString(f1))); storeFiles.put(f1, p); - + scope.put(f1, 1); p = new ArrayList<>(1); p.add(new Path(Bytes.toString(f2))); storeFiles.put(f2, p); - // 2. Create bulk load descriptor BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index 2ad34ea..3ef658f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager { private static final HRegionInfo info = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); private static final HTableDescriptor htd = new HTableDescriptor(tableName); + private static NavigableMap scopes; private WAL log; private ReplicationWALReaderManager logManager; @@ -123,6 +126,11 @@ public class TestReplicationWALReaderManager { cluster = TEST_UTIL.getDFSCluster(); fs = cluster.getFileSystem(); + scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } } @AfterClass @@ -204,9 +212,8 @@ public class TestReplicationWALReaderManager { } private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(htd, info, - new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), - getWALEdits(count), true); + final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); log.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index 6eac388..79b94cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; // imports for things that haven't moved yet import org.apache.hadoop.hbase.regionserver.wal.FSHLog; @@ -60,12 +59,12 @@ public class FaultyFSLog extends FSHLog { } @Override - public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - boolean inMemstore) throws IOException { + public long append(HRegionInfo info, WALKey key, + WALEdit edits, boolean inMemstore) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); } - return super.append(htd, info, key, edits, inMemstore); + return super.append(info, key, edits, inMemstore); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index 89c63a6..ec62ed3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -25,8 +25,10 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.HashSet; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -151,23 +153,25 @@ public class TestDefaultWALProvider { protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, - int times) throws IOException { + int times, NavigableMap scopes) throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), - cols, true); + log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), + cols, true); } log.sync(); } /** * used by TestDefaultWALProviderWithHLogKey + * @param scopes */ - WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) { - return new WALKey(info, tableName, timestamp, mvcc); + WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp, + NavigableMap scopes) { + return new WALKey(info, tableName, timestamp, mvcc, scopes); } /** @@ -191,6 +195,16 @@ public class TestDefaultWALProvider { final HTableDescriptor htd2 = new HTableDescriptor(TableName.valueOf("testLogCleaning2")) .addFamily(new HColumnDescriptor("row")); + NavigableMap scopes1 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes1.put(fam, 0); + } + NavigableMap scopes2 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd2.getFamiliesKeys()) { + scopes2.put(fam, 0); + } final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); @@ -205,26 +219,26 @@ public class TestDefaultWALProvider { // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, htd, 1); + addEdits(log, hri, htd, 1, scopes1); log.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, htd, 2); + addEdits(log, hri, htd, 2, scopes1); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, htd, 1); - addEdits(log, hri2, htd2, 1); - addEdits(log, hri, htd, 1); - addEdits(log, hri2, htd2, 1); + addEdits(log, hri, htd, 1, scopes1); + addEdits(log, hri2, htd2, 1, scopes2); + addEdits(log, hri, htd, 1, scopes1); + addEdits(log, hri2, htd2, 1, scopes2); log.rollWriter(); assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, htd2, 1); + addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); @@ -233,7 +247,7 @@ public class TestDefaultWALProvider { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, htd2, 1); + addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); @@ -264,6 +278,16 @@ public class TestDefaultWALProvider { new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); HTableDescriptor table2 = new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); + NavigableMap scopes1 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : table1.getFamiliesKeys()) { + scopes1.put(fam, 0); + } + NavigableMap scopes2 = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : table2.getFamiliesKeys()) { + scopes2.put(fam, 0); + } final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); @@ -281,31 +305,31 @@ public class TestDefaultWALProvider { hri2.setSplit(false); // variables to mock region sequenceIds. // start with the testing logic: insert a waledit, and roll writer - addEdits(wal, hri1, table1, 1); + addEdits(wal, hri1, table1, 1, scopes1); wal.rollWriter(); // assert that the wal is rolled assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits in the second wal file, and roll writer. - addEdits(wal, hri1, table1, 1); + addEdits(wal, hri1, table1, 1, scopes1); wal.rollWriter(); // assert that the wal is rolled assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. - addEdits(wal, hri1, table1, 3); + addEdits(wal, hri1, table1, 3, scopes1); flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); // roll log; all old logs should be archived. wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); // add an edit to table2, and roll writer - addEdits(wal, hri2, table2, 1); + addEdits(wal, hri2, table2, 1, scopes2); wal.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table1, and roll writer - addEdits(wal, hri1, table1, 2); + addEdits(wal, hri1, table1, 2, scopes1); wal.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. - addEdits(wal, hri2, table2, 2); + addEdits(wal, hri2, table2, 2, scopes2); flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); // the log : region-sequenceId map is // log1: region2 (unflushed) @@ -315,7 +339,7 @@ public class TestDefaultWALProvider { wal.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. - addEdits(wal, hri2, table2, 2); + addEdits(wal, hri2, table2, 2, scopes2); flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java index 1885d87..ef92768 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.wal; +import java.util.NavigableMap; + import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -28,7 +30,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @Category({RegionServerTests.class, LargeTests.class}) public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider { @Override - WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) { - return new HLogKey(info, tableName, timestamp, mvcc); + WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp, + final NavigableMap scopes) { + return new HLogKey(info, tableName, timestamp, mvcc, scopes); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 079e0cb..caa0a45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -79,6 +81,11 @@ public class TestSecureWAL { TableName tableName = TableName.valueOf("TestSecureWAL"); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); final int total = 10; @@ -95,8 +102,8 @@ public class TestSecureWAL { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); - wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, true); + wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), scopes), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 747977a..0eef3b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -181,6 +183,11 @@ public class TestWALFactory { } HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } // Add edits for three regions. for (int ii = 0; ii < howmany; ii++) { @@ -196,8 +203,8 @@ public class TestWALFactory { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc); - log.append(htd, infos[i], walKey, edit, true); + System.currentTimeMillis(), mvcc, scopes); + log.append(infos[i], walKey, edit, true); walKey.getWriteEntry(); } log.sync(); @@ -249,13 +256,18 @@ public class TestWALFactory { null,null, false); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } final WAL wal = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), kvs, true); + wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -273,8 +285,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), kvs, true); + wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); reader = wals.createReader(fs, walPath); @@ -295,8 +307,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); - wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), kvs, true); + wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -370,12 +382,17 @@ public class TestWALFactory { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, true); + wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), scopes), kvs, true); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -485,6 +502,11 @@ public class TestWALFactory { final HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( "column")); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; try { @@ -503,9 +525,9 @@ public class TestWALFactory { row,Bytes.toBytes(Bytes.toString(row) + "1"), false); final WAL log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); - final long txid = log.append(htd, info, + final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc), + mvcc, scopes), cols, true); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); @@ -545,6 +567,11 @@ public class TestWALFactory { final HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( "column")); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); @@ -561,9 +588,9 @@ public class TestWALFactory { HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); - final long txid = log.append(htd, hri, + final long txid = log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc), + mvcc, scopes), cols, true); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); @@ -607,7 +634,11 @@ public class TestWALFactory { long timestamp = System.currentTimeMillis(); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); - + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } HRegionInfo hri = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); @@ -617,8 +648,8 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), cols, true); + log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols, true); } log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -627,8 +658,8 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), cols, true); + log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols, true); log.sync(); assertEquals(COL_COUNT, visitor.increments); } @@ -722,8 +753,9 @@ public class TestWALFactory { } @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { - //To change body of implemented methods use File | Settings | File Templates. + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { + // To change body of implemented methods use File | Settings | File + // Templates. increments++; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 9ae98c6..beac9e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.LogFactory; @@ -96,6 +98,11 @@ public class TestWALReaderOnSecureWAL { TableName tableName = TableName.valueOf(tblName); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); final int total = 10; @@ -109,8 +116,8 @@ public class TestWALReaderOnSecureWAL { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); - wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), kvs, true); + wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index e138174..4a15d3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -23,8 +23,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -128,6 +130,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { private final int syncInterval; private final HTableDescriptor htd; private final Sampler loopSampler; + private final NavigableMap scopes; WALPutBenchmark(final HRegion region, final HTableDescriptor htd, final long numIterations, final boolean noSync, final int syncInterval, @@ -138,6 +141,11 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { this.numFamilies = htd.getColumnFamilies().length; this.region = region; this.htd = htd; + scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + for(byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes"); if (spanReceivers == null || spanReceivers.isEmpty()) { loopSampler = Sampler.NEVER; @@ -180,8 +188,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); final WALKey logkey = - new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc); - wal.append(htd, hri, logkey, walEdit, true); + new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); + wal.append(hri, logkey, walEdit, true); if (!this.noSync) { if (++lastSync >= this.syncInterval) { wal.sync(); @@ -498,8 +506,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { private int appends = 0; @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, - WALEdit logEdit) { + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { this.appends++; if (this.appends % whenToRoll == 0) { LOG.info("Rolling after " + appends + " edits");