From e6c3b5a468fc14a78882a21e9f5327db1af03d53 Mon Sep 17 00:00:00 2001 From: youhailang <906669319@qq.com> Date: Thu, 20 May 2021 20:00:03 +0800 Subject: [PATCH] Fix concurrent updates to queues. --- .../hadoop/hbase/replication/regionserver/ReplicationSource.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8dd856875c..6f36d23789 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -197,8 +197,12 @@ public class ReplicationSource implements ReplicationSourceInterface { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { - queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); - queues.put(logPrefix, queue); + synchronized (queues) { + queue = queues.get(logPrefix); + if (queue == null) { + queues.put(logPrefix, queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator())); + } + } if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread -- 2.14.1.windows.1