### Eclipse Workspace Patch 1.0 #P jackrabbit-core Index: src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java (revision 1707047) +++ src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java (working copy) @@ -16,29 +16,15 @@ */ package org.apache.jackrabbit.core.gc; -import org.apache.jackrabbit.api.management.DataStoreGarbageCollector; -import org.apache.jackrabbit.api.management.MarkEventListener; -import org.apache.jackrabbit.core.RepositoryContext; -import org.apache.jackrabbit.core.SessionImpl; -import org.apache.jackrabbit.core.data.DataStore; -import org.apache.jackrabbit.core.id.NodeId; -import org.apache.jackrabbit.core.id.PropertyId; -import org.apache.jackrabbit.core.observation.SynchronousEventListener; -import org.apache.jackrabbit.core.persistence.IterablePersistenceManager; -import org.apache.jackrabbit.core.persistence.util.NodeInfo; -import org.apache.jackrabbit.core.state.ItemStateException; -import org.apache.jackrabbit.core.state.NoSuchItemStateException; -import org.apache.jackrabbit.core.state.NodeState; -import org.apache.jackrabbit.core.state.PropertyState; -import org.apache.jackrabbit.core.value.InternalValue; -import org.apache.jackrabbit.spi.Name; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import javax.jcr.InvalidItemStateException; @@ -57,6 +43,26 @@ import javax.jcr.observation.EventIterator; import javax.jcr.observation.ObservationManager; +import org.apache.jackrabbit.api.management.DataStoreGarbageCollector; +import org.apache.jackrabbit.api.management.MarkEventListener; +import org.apache.jackrabbit.core.RepositoryContext; +import org.apache.jackrabbit.core.SessionImpl; +import org.apache.jackrabbit.core.data.DataStore; +import org.apache.jackrabbit.core.id.NodeId; +import org.apache.jackrabbit.core.id.PropertyId; +import org.apache.jackrabbit.core.observation.SynchronousEventListener; +import org.apache.jackrabbit.core.persistence.IterablePersistenceManager; +import org.apache.jackrabbit.core.persistence.PersistenceManager; +import org.apache.jackrabbit.core.persistence.util.NodeInfo; +import org.apache.jackrabbit.core.state.ItemStateException; +import org.apache.jackrabbit.core.state.NoSuchItemStateException; +import org.apache.jackrabbit.core.state.NodeState; +import org.apache.jackrabbit.core.state.PropertyState; +import org.apache.jackrabbit.core.value.InternalValue; +import org.apache.jackrabbit.spi.Name; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Garbage collector for DataStore. This implementation iterates through all * nodes and reads the binary properties. To detect nodes that are moved while @@ -80,7 +86,28 @@ * */ public class GarbageCollector implements DataStoreGarbageCollector { + + private class ScanNodeIdListTask implements Callable { + private int split; + private List nodeList; + private PersistenceManager pm; + private int pmCount; + + public ScanNodeIdListTask(int split, List nodeList, PersistenceManager pm, int pmCount) { + this.split = split; + this.nodeList = nodeList; + this.pm = pm; + this.pmCount = pmCount; + } + + public Void call() throws Exception { + scanNodeIdList(split, nodeList, pm, pmCount); + return null; + } + + } + /** logger instance */ static final Logger LOG = LoggerFactory.getLogger(GarbageCollector.class); @@ -99,6 +126,10 @@ private long sleepBetweenNodes; + private long minSplitSize = 100000; + + private int concurrentThreadSize = 3; + protected int testDelay; private final DataStore store; @@ -148,6 +179,22 @@ return sleepBetweenNodes; } + public long getMinSplitSize() { + return minSplitSize; + } + + public void setMinSplitSize(long minSplitSize) { + this.minSplitSize = minSplitSize; + } + + public int getConcurrentThreadSize() { + return concurrentThreadSize; + } + + public void setConcurrentThreadSize(int concurrentThreadSize) { + this.concurrentThreadSize = concurrentThreadSize; + } + /** * When testing the garbage collection, a delay is used instead of simulating concurrent access. * @@ -256,37 +303,84 @@ pmCount++; List allNodeIds = pm.getAllNodeIds(null, 0); int overAllCount = allNodeIds.size(); - int count = 0; - for (NodeId id : allNodeIds) { - count++; - if (count % 1000 == 0) { - LOG.debug(pm.toString() + " ("+pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + overAllCount + "]..."); + if (overAllCount > minSplitSize) { + final int splits = getConcurrentThreadSize(); + ExecutorService executorService = Executors.newFixedThreadPool(splits); + try { + Set> futures = new HashSet>(); + List> lists = splitIntoParts(allNodeIds, splits); + LOG.debug(splits + " concurrent Threads will be started. Split Size: " + lists.get(0).size()+" Total Size: " + overAllCount); + for (int i = 0; i < splits; i++) { + List subList = lists.get(i); + futures.add(executorService.submit(new ScanNodeIdListTask(i + 1, subList, pm, pmCount))); + } + for (Future future : futures) { + future.get(); + } + } catch (Exception e) { + throw new RepositoryException(e); + } finally { + executorService.shutdown(); } - if (callback != null) { - callback.beforeScanning(null); + } else { + scanNodeIdList(0, allNodeIds, pm, pmCount); + } + } + } + + private void scanNodeIdList(int split, List nodeList, PersistenceManager pm, int pmCount) throws RepositoryException, ItemStateException { + int count = 0; + for (NodeId id : nodeList) { + count++; + if (count % 1000 == 0) { + if (split > 0) { + LOG.debug("[Split " + split + "] " + pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]..."); + } else { + LOG.debug(pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]..."); } - try { - NodeState state = pm.load(id); - Set propertyNames = state.getPropertyNames(); - for (Name name : propertyNames) { - PropertyId pid = new PropertyId(id, name); - PropertyState ps = pm.load(pid); - if (ps.getType() == PropertyType.BINARY) { - for (InternalValue v : ps.getValues()) { - // getLength will update the last modified date - // if the persistence manager scan is running - v.getLength(); - } + } + if (callback != null) { + callback.beforeScanning(null); + } + try { + NodeState state = pm.load(id); + Set propertyNames = state.getPropertyNames(); + for (Name name : propertyNames) { + PropertyId pid = new PropertyId(id, name); + PropertyState ps = pm.load(pid); + if (ps.getType() == PropertyType.BINARY) { + for (InternalValue v : ps.getValues()) { + // getLength will update the last modified date + // if the persistence manager scan is running + v.getLength(); } } - } catch (NoSuchItemStateException e) { - // the node may have been deleted or moved in the meantime - // ignore it } + } catch (NoSuchItemStateException e) { + // the node may have been deleted or moved in the meantime + // ignore it } } } + private List> splitIntoParts(List ls, int parts) { + final List> listParts = new ArrayList>(); + final int chunkSize = ls.size() / parts; + int leftOver = ls.size() % parts; + int iTake = chunkSize; + + for (int i = 0, iT = ls.size(); i < iT; i += iTake) { + if (leftOver > 0) { + leftOver--; + iTake = chunkSize + 1; + } else { + iTake = chunkSize; + } + listParts.add(new ArrayList(ls.subList(i, Math.min(iT, i + iTake)))); + } + return listParts; + } + /** * Reset modifiedDateOnAccess to 0 and stop the observation * listener if any are installed.