diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java index 34d2ae26c1..58685d2307 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -17,13 +17,9 @@ package org.apache.jackrabbit.oak.segment.standby.client; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Set; -import java.util.UUID; - import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; + import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; @@ -35,6 +31,12 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + /** * Encapsulates the algorithm for a single execution of the synchronization * process between the primary and the standby instance. It also contains @@ -104,84 +106,14 @@ class StandbyClientSyncExecution { } private void copySegmentHierarchyFromPrimary(StandbyClient client, UUID segmentId) throws Exception { - LinkedList batch = new LinkedList<>(); - - batch.offer(segmentId); - - LinkedList bulk = new LinkedList<>(); - LinkedList data = new LinkedList<>(); - Set visited = new HashSet<>(); - Set queued = new HashSet<>(); - Set local = new HashSet<>(); - - while (!batch.isEmpty()) { - UUID current = batch.remove(); - - log.debug("Inspecting segment {}", current); - visited.add(current); - - // Add the current segment ID at the beginning of the respective - // list, depending on its type. This allows to process those - // segments in an optimal topological order later on. If the current - // segment is a bulk segment, we can skip the rest of the loop, - // since bulk segments don't reference any other segment. - - if (SegmentId.isDataSegmentId(current.getLeastSignificantBits())) { - data.addFirst(current); - } else { - bulk.addFirst(current); - continue; - } - - for (String s : readReferences(client, current)) { - UUID referenced = UUID.fromString(s); - - // Short circuit for the "backward reference". The segment graph - // is not guaranteed to be acyclic, so there might be segments - // pointing back to a previously visited (but locally - // unavailable) segment. - - if (visited.contains(referenced)) { - continue; - } - - // Short circuit for the "diamond problem". Imagine that segment - // S1 references S2 and S3 and both S2 and S3 reference S4. - // These references form the shape of a diamond. If the segments - // are processed in the order S1, S2, S3, then S4 is added twice - // to the 'batch' queue. The following check prevents processing - // S4 twice or more. + List bulk = new LinkedList<>(); + List data = new LinkedList<>(); - if (queued.contains(referenced)) { - continue; - } - - // Short circuit for the "sharing-is-caring problem". If many - // new segments are sharing segments that are already locally - // available, we should not issue a request for it to the - // server. Moreover, if a segment was visited and persisted - // during this synchronization process, it will end up in the - // 'local' set as well. - - if (local.contains(referenced)) { - continue; - } - - if (isLocal(referenced)) { - local.add(referenced); - continue; - } - - // If we arrive at this point, the referenced segment is 1) not - // present locally, 2) not already queued for retrieval and 3) - // never visited before. We can safely add the reference to the - // queue and transfer the segment later. - - log.debug("Found reference from {} to {}", current, referenced); - batch.add(referenced); - queued.add(referenced); - } + if (SegmentId.isDataSegmentId(segmentId.getLeastSignificantBits())) { + deriveTopologicalOrder(client, segmentId, visited, data, bulk); + } else { + bulk.add(segmentId); } for (UUID id : bulk) { @@ -195,6 +127,33 @@ class StandbyClientSyncExecution { } } + + private void deriveTopologicalOrder(StandbyClient client, UUID id, Set visited, List data, + List bulk) throws Exception { + + // Use DFS to traverse segment graph and make sure + // to add each data segment to the data list only + // after all its references were already added + + log.debug("Inspecting segment {}", id); + visited.add(id); + + if (SegmentId.isDataSegmentId(id.getLeastSignificantBits())) { + for (String s : readReferences(client, id)) { + UUID referenced = UUID.fromString(s); + + if (!visited.contains(referenced) && !isLocal(referenced)) { + log.debug("Found reference from {} to {}", id, referenced); + deriveTopologicalOrder(client, referenced, visited, data, bulk); + } + } + + data.add(id); + } else { + bulk.add(id); + } + } + private Iterable readReferences(StandbyClient client, UUID id) throws InterruptedException { Iterable references = client.getReferences(id.toString());