From 8b454220d1b5cc45644e9adbdc81c197893d2666 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 5 Jan 2017 17:47:38 +0530 Subject: [PATCH] HBASE-17290 Potential loss of data for replication of bulk loaded hfiles --- .../hbase/replication/ReplicationQueues.java | 5 ++- .../hbase/replication/ReplicationQueuesZKImpl.java | 11 +++-- .../TableBasedReplicationQueuesImpl.java | 4 +- .../hadoop/hbase/regionserver/HRegionServer.java | 4 ++ .../replication/regionserver/Replication.java | 50 ++++++++------------- .../regionserver/ReplicationObserver.java | 52 ++++++++++++++++++++++ .../regionserver/ReplicationSource.java | 11 ++--- .../regionserver/ReplicationSourceInterface.java | 5 ++- .../regionserver/ReplicationSourceManager.java | 4 +- .../cleaner/TestReplicationHFileCleaner.java | 9 ++-- .../hbase/replication/ReplicationSourceDummy.java | 3 +- .../replication/TestReplicationStateBasic.java | 33 ++++++++------ 12 files changed, 125 insertions(+), 66 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0ae27d0..dcff943 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import java.util.SortedSet; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Pair; @@ -144,10 +145,10 @@ public interface ReplicationQueues { /** * Add new hfile references to the queue. * @param peerId peer cluster id to which the hfiles need to be replicated - * @param files list of hfile references to be added + * @param pairs list of hfile references to be added * @throws ReplicationException if fails to add a hfile reference */ - void addHFileRefs(String peerId, List files) throws ReplicationException; + void addHFileRefs(String peerId, List> pairs) throws ReplicationException; /** * Remove hfile references from the queue. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index dcbc0f0..394aebb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -319,16 +320,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public void addHFileRefs(String peerId, List files) throws ReplicationException { + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); boolean debugEnabled = LOG.isDebugEnabled(); if (debugEnabled) { - LOG.debug("Adding hfile references " + files + " in queue " + peerZnode); + LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); } List listOfOps = new ArrayList(); - int size = files.size(); + int size = pairs.size(); for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)), + listOfOps.add(ZKUtilOp.createAndFailSilent( + ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), HConstants.EMPTY_BYTE_ARRAY)); } if (debugEnabled) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index 28b9bdf..bdf6a6c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -307,7 +308,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } @Override - public void addHFileRefs(String peerId, List files) throws ReplicationException { + public void addHFileRefs(String peerId, List> files) + throws ReplicationException { // TODO throw new NotImplementedException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 853d699..3c9d54f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFa import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.UserProvider; @@ -524,6 +525,9 @@ public class HRegionServer extends HasThread implements checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); + + Replication.decorateRegionServerConfiguration(this.conf); + // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 5f87690..7ac002d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -236,34 +237,6 @@ public class Replication extends WALActionsListener.Base implements scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); } - @Override - public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, - final WALEdit edit) throws IOException { - NavigableMap scopes = logKey.getReplicationScopes(); - if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) { - TableName tableName = logKey.getTablename(); - for (Cell c : edit.getCells()) { - // Only check for bulk load events - if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(c); - } catch (IOException e) { - LOG.error("Failed to get bulk load events information from the wal file.", e); - throw e; - } - - for (StoreDescriptor s : bld.getStoresList()) { - byte[] fam = s.getFamilyName().toByteArray(); - if (scopes.containsKey(fam)) { - addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s); - } - } - } - } - } - } - /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. @@ -298,10 +271,10 @@ public class Replication extends WALActionsListener.Base implements } } - private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager, - TableName tableName, byte[] family, StoreDescriptor s) throws IOException { + void addHFileRefsToQueue(TableName tableName, byte[] family, List> pairs) + throws IOException { try { - replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); + this.replicationManager.addHFileRefs(tableName, family, pairs); } catch (ReplicationException e) { LOG.error("Failed to add hfile references in the replication queue.", e); throw new IOException(e); @@ -337,6 +310,21 @@ public class Replication extends WALActionsListener.Base implements } } + /** + * This method modifies the region server's configuration in order to inject replication-related + * features + * @param conf region server configurations + */ + public static void decorateRegionServerConfiguration(Configuration conf) { + if (isReplicationForBulkLoadDataEnabled(conf)) { + String plugins = conf.get("hbase.coprocessor.regionserver.classes"); + String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + if (!plugins.contains(rsCoprocessorClass)) { + conf.set("hbase.coprocessor.regionserver.classes", plugins + "," + rsCoprocessorClass); + } + } + } + /* * Statistics thread. Periodically prints the cache statistics to the log. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java new file mode 100644 index 0000000..2ec60d5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An Observer to facilitate replication operations + */ + +public class ReplicationObserver extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(ReplicationObserver.class); + + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + RegionCoprocessorEnvironment env = ctx.getEnvironment(); + Configuration c = env.getConfiguration(); + if (pairs == null + || pairs.isEmpty() + || !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { + LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded " + + "data replication."); + return; + } + HRegionServer rs = (HRegionServer) env.getRegionServerServices(); + Replication rep = (Replication) rs.getReplicationSourceService(); + rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3eeb4b8..7a229eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -253,7 +254,7 @@ public class ReplicationSource extends Thread } @Override - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { String peerId = peerClusterZnode; if (peerId.contains("-")) { @@ -266,8 +267,8 @@ public class ReplicationSource extends Thread List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.replicationQueues.addHFileRefs(peerId, files); - metrics.incrSizeOfHFileRefsQueue(files.size()); + this.replicationQueues.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(family) + " to peer id " + peerId); @@ -275,8 +276,8 @@ public class ReplicationSource extends Thread } else { // user has explicitly not defined any table cfs for replication, means replicate all the // data - this.replicationQueues.addHFileRefs(peerId, files); - metrics.incrSizeOfHFileRefsQueue(files.size()); + this.replicationQueues.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 7f4a9f7..d01cc6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.util.Pair; /** * Interface that defines a replication source @@ -112,10 +113,10 @@ public interface ReplicationSourceInterface { * Add hfile names to the queue to be replicated. * @param tableName Name of the table these files belongs to * @param family Name of the family these files belong to - * @param files files whose names needs to be added to the queue to be replicated + * @param pairs files whose names needs to be added to the queue to be replicated * @throws ReplicationException If failed to add hfile references */ - void addHFileRefs(TableName tableName, byte[] family, List files) + void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ef4093e..5b574da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -846,10 +846,10 @@ public class ReplicationSourceManager implements ReplicationListener { return stats.toString(); } - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { for (ReplicationSourceInterface source : this.sources) { - source.addHFileRefs(tableName, family, files); + source.addHFileRefs(tableName, family, pairs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index fc3e516..817cfb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -138,8 +139,8 @@ public class TestReplicationHFileCleaner { + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); - List files = new ArrayList(1); - files.add(file.getName()); + List> files = new ArrayList<>(1); + files.add(new Pair(null, file)); // 4. Add the file to hfile-refs queue rq.addHFileRefs(peerId, files); // 5. Assert file should not be deletable @@ -166,8 +167,8 @@ public class TestReplicationHFileCleaner { f.setPath(notDeletablefile); files.add(f); - List hfiles = new ArrayList<>(1); - hfiles.add(notDeletablefile.getName()); + List> hfiles = new ArrayList<>(1); + hfiles.add(new Pair(null, notDeletablefile)); // 2. Add one file to hfile-refs queue rq.addHFileRefs(peerId, hfiles); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index abe484e..57e54d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.util.Pair; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -93,7 +94,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> files) throws ReplicationException { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index fcab105..f8be9a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -25,7 +25,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.zookeeper.KeeperException; import org.junit.Before; @@ -202,10 +204,10 @@ public abstract class TestReplicationStateBasic { rq1.init(server1); rqc.init(); - List files1 = new ArrayList(3); - files1.add("file_1"); - files1.add("file_2"); - files1.add("file_3"); + List> files1 = new ArrayList<>(3); + files1.add(new Pair(null, new Path("file_1"))); + files1.add(new Pair(null, new Path("file_2"))); + files1.add(new Pair(null, new Path("file_3"))); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); @@ -213,13 +215,16 @@ public abstract class TestReplicationStateBasic { rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); - List files2 = new ArrayList<>(files1); - String removedString = files2.remove(0); - rq1.removeHFileRefs(ID_ONE, files2); + List hfiles2 = new ArrayList<>(); + for (Pair p : files1) { + hfiles2.add(p.getSecond().getName()); + } + String removedString = hfiles2.remove(0); + rq1.removeHFileRefs(ID_ONE, hfiles2); assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); - files2 = new ArrayList<>(1); - files2.add(removedString); - rq1.removeHFileRefs(ID_ONE, files2); + hfiles2 = new ArrayList<>(1); + hfiles2.add(removedString); + rq1.removeHFileRefs(ID_ONE, hfiles2); assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); rp.unregisterPeer(ID_ONE); } @@ -235,10 +240,10 @@ public abstract class TestReplicationStateBasic { rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); rq1.addPeerToHFileRefs(ID_TWO); - List files1 = new ArrayList(3); - files1.add("file_1"); - files1.add("file_2"); - files1.add("file_3"); + List> files1 = new ArrayList<>(3); + files1.add(new Pair(null, new Path("file_1"))); + files1.add(new Pair(null, new Path("file_2"))); + files1.add(new Pair(null, new Path("file_3"))); rq1.addHFileRefs(ID_ONE, files1); rq1.addHFileRefs(ID_TWO, files1); assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); -- 2.9.0.windows.1