Index: src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (revision 1213661) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (working copy) @@ -266,14 +266,13 @@ merger.setMergeFactor(handler.getMergeFactor()); merger.setMinMergeDocs(handler.getMinMergeDocs()); - IndexingQueueStore store = new IndexingQueueStore(indexDir); - // initialize indexing queue - this.indexingQueue = new IndexingQueue(store); + this.indexingQueue = new IndexingQueue(new IndexingQueueStore(indexDir)); // open persistent indexes - for (Iterator it = indexNames.iterator(); it.hasNext(); ) { - IndexInfo info = (IndexInfo) it.next(); + Iterator iterator = indexNames.iterator(); + while (iterator.hasNext()) { + IndexInfo info = iterator.next(); String name = info.getName(); // only open if it still exists // it is possible that indexNames still contains a name for @@ -386,10 +385,11 @@ executeAndLog(new Start(Action.INTERNAL_TRANSACTION)); NodeState rootState = (NodeState) stateMgr.getItemState(rootId); count = createIndex(rootState, rootPath, stateMgr, count); + checkIndexingQueue(true); executeAndLog(new Commit(getTransactionId())); log.debug("Created initial index for {} nodes", count); releaseMultiReader(); - scheduleFlushTask(); + safeFlush(); } catch (Exception e) { String msg = "Error indexing workspace"; IOException ex = new IOException(msg); @@ -397,6 +397,7 @@ throw ex; } finally { reindexing = false; + scheduleFlushTask(); } } else { throw new IllegalStateException("Index already present"); @@ -1167,7 +1168,8 @@ private void commitVolatileIndex() throws IOException { // check if volatile index contains documents at all - if (volatileIndex.getNumDocuments() > 0) { + int volatileIndexDocuments = volatileIndex.getNumDocuments(); + if (volatileIndexDocuments > 0) { long time = System.currentTimeMillis(); // create index @@ -1185,7 +1187,7 @@ resetVolatileIndex(); time = System.currentTimeMillis() - time; - log.debug("Committed in-memory index in " + time + "ms."); + log.debug("Committed in-memory index containing {} documents in {}ms.", volatileIndexDocuments, time); } } @@ -1298,7 +1300,7 @@ } } } - + void safeFlush() throws IOException{ synchronized (updateMonitor) { updateInProgress = true; Index: src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (revision 1213661) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (working copy) @@ -41,11 +41,25 @@ import org.apache.jackrabbit.core.HierarchyManager; import org.apache.jackrabbit.core.SessionImpl; +import org.apache.jackrabbit.core.cluster.ChangeLogRecord; +import org.apache.jackrabbit.core.cluster.ClusterNode; +import org.apache.jackrabbit.core.cluster.ClusterRecord; +import org.apache.jackrabbit.core.cluster.ClusterRecordDeserializer; +import org.apache.jackrabbit.core.cluster.ClusterRecordProcessor; +import org.apache.jackrabbit.core.cluster.LockRecord; +import org.apache.jackrabbit.core.cluster.NamespaceRecord; +import org.apache.jackrabbit.core.cluster.NodeTypeRecord; +import org.apache.jackrabbit.core.cluster.PrivilegeRecord; +import org.apache.jackrabbit.core.cluster.WorkspaceRecord; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.fs.FileSystemException; import org.apache.jackrabbit.core.fs.FileSystemResource; import org.apache.jackrabbit.core.fs.local.LocalFileSystem; import org.apache.jackrabbit.core.id.NodeId; +import org.apache.jackrabbit.core.journal.Journal; +import org.apache.jackrabbit.core.journal.JournalException; +import org.apache.jackrabbit.core.journal.Record; +import org.apache.jackrabbit.core.journal.RecordIterator; import org.apache.jackrabbit.core.query.AbstractQueryHandler; import org.apache.jackrabbit.core.query.ExecutableQuery; import org.apache.jackrabbit.core.query.QueryHandler; @@ -54,6 +68,7 @@ import org.apache.jackrabbit.core.query.lucene.directory.FSDirectoryManager; import org.apache.jackrabbit.core.query.lucene.hits.AbstractHitCollector; import org.apache.jackrabbit.core.session.SessionContext; +import org.apache.jackrabbit.core.state.ItemState; import org.apache.jackrabbit.core.state.ItemStateException; import org.apache.jackrabbit.core.state.ItemStateManager; import org.apache.jackrabbit.core.state.NoSuchItemStateException; @@ -98,6 +113,8 @@ * Lucene. */ public class SearchIndex extends AbstractQueryHandler { + + public static String JCR3162_ENABLED = "JCR3162"; /** * Valid node type names under /jcr:system. Used to determine if a @@ -547,6 +564,9 @@ } index.createInitialIndex(context.getItemStateManager(), context.getRootId(), rootPath); + if (Boolean.getBoolean(JCR3162_ENABLED)) { + checkPendingJournalChanges(context); + } } if (consistencyCheckEnabled && (index.getRedoLogApplied() || forceConsistencyCheck)) { @@ -2477,6 +2497,46 @@ this.redoLogFactoryClass = className; } + /** + * In the case of an initial index build operation, this checks if there are + * some new nodes pending in the journal and tries to preemptively delete + * them, to keep the index consistent. + * + * See JCR-3162 + * + * @param context + * @throws IOException + */ + void checkPendingJournalChanges(QueryHandlerContext context) { + ClusterNode cn = context.getClusterNode(); + if (cn == null) { + return; + } + + List addedIds = new ArrayList(); + long rev = cn.getRevision(); + + List changes = getChangeLogRecords(rev, context.getWorkspace()); + Iterator iterator = changes.iterator(); + while (iterator.hasNext()) { + ChangeLogRecord record = iterator.next(); + for (ItemState state : record.getChanges().addedStates()) { + if (!state.isNode()) { + continue; + } + addedIds.add((NodeId) state.getId()); + } + } + if (!addedIds.isEmpty()) { + Collection empty = Collections.emptyList(); + try { + updateNodes(addedIds.iterator(), empty.iterator()); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + //----------------------------< internal >---------------------------------- /** @@ -2490,4 +2550,82 @@ throw new IOException("query handler closed and cannot be used anymore."); } } + + /** + * Polls the underlying journal for events of the type ChangeLogRecord that + * happened after a given revision, on a workspace + * + * This is used by the {@link MultiIndex} to determine if there are some + * incoming changes. + * + * @param revision + * starting revision + * @param workspace + * the workspace name + * @return + */ + public List getChangeLogRecords(long revision, + final String workspace) { + log.debug( + "Get changes from the Journal for revision {} and workspace {}.", + revision, workspace); + ClusterNode cn = getContext().getClusterNode(); + if (cn == null) { + return Collections.emptyList(); + } + Journal journal = cn.getJournal(); + final List events = new ArrayList(); + ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer(); + RecordIterator records = null; + try { + records = journal.getRecords(revision); + while (records.hasNext()) { + Record record = records.nextRecord(); + if (!record.getProducerId().equals(cn.getId())) { + continue; + } + ClusterRecord r = null; + try { + r = deserializer.deserialize(record); + } catch (JournalException e) { + log.error( + "Unable to read revision '" + record.getRevision() + + "'.", e); + } + if (r == null) { + continue; + } + r.process(new ClusterRecordProcessor() { + public void process(ChangeLogRecord record) { + String eventW = record.getWorkspace(); + if (eventW != null ? eventW.equals(workspace) : workspace == null) { + events.add(record); + } + } + + public void process(LockRecord record) { + } + + public void process(NamespaceRecord record) { + } + + public void process(NodeTypeRecord record) { + } + + public void process(PrivilegeRecord record) { + } + + public void process(WorkspaceRecord record) { + } + }); + } + } catch (JournalException e1) { + log.error(e1.getMessage(), e1); + } finally { + if (records != null) { + records.close(); + } + } + return events; + } } Index: src/main/java/org/apache/jackrabbit/core/query/QueryHandlerContext.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/QueryHandlerContext.java (revision 1213661) +++ src/main/java/org/apache/jackrabbit/core/query/QueryHandlerContext.java (working copy) @@ -22,6 +22,7 @@ import org.apache.jackrabbit.core.HierarchyManager; import org.apache.jackrabbit.core.NamespaceRegistryImpl; import org.apache.jackrabbit.core.RepositoryContext; +import org.apache.jackrabbit.core.cluster.ClusterNode; import org.apache.jackrabbit.core.id.NodeId; import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; import org.apache.jackrabbit.core.persistence.PersistenceManager; @@ -36,6 +37,11 @@ public class QueryHandlerContext { /** + * The workspace + */ + private final String workspace; + + /** * Repository context. */ private final RepositoryContext repositoryContext; @@ -78,22 +84,26 @@ /** * Creates a new context instance. * - * @param stateMgr provides persistent item states. - * @param pm the underlying persistence manager. - * @param rootId the id of the root node. - * @param parentHandler the parent query handler or null it - * there is no parent handler. - * @param excludedNodeId id of the node that should be excluded from - * indexing. Any descendant of that node is also - * excluded from indexing. + * @param workspace the workspace name. + * @param repositoryContext the repository context. + * @param stateMgr provides persistent item states. + * @param pm the underlying persistence manager. + * @param rootId the id of the root node. + * @param parentHandler the parent query handler or null it + * there is no parent handler. + * @param excludedNodeId id of the node that should be excluded from + * indexing. Any descendant of that node is also + * excluded from indexing. */ public QueryHandlerContext( + String workspace, RepositoryContext repositoryContext, SharedItemStateManager stateMgr, PersistenceManager pm, NodeId rootId, QueryHandler parentHandler, NodeId excludedNodeId) { + this.workspace = workspace; this.repositoryContext = repositoryContext; this.stateMgr = stateMgr; this.hmgr = new CachingHierarchyManager(rootId, stateMgr); @@ -201,4 +211,17 @@ return repositoryContext.getExecutor(); } + /** + * Returns the cluster node instance of this repository, or + * null if clustering is not enabled. + * + * @return cluster node + */ + public ClusterNode getClusterNode() { + return repositoryContext.getClusterNode(); + } + + public String getWorkspace() { + return workspace; + } } Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 1213661) +++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy) @@ -605,7 +605,7 @@ if (systemSearchMgr == null) { if (repConfig.isSearchEnabled()) { systemSearchMgr = new SearchManager( - context, + null, context, repConfig, getWorkspaceInfo(wspName).itemStateMgr, context.getInternalVersionManager().getPersistenceManager(), @@ -1853,6 +1853,7 @@ // search manager is lazily instantiated in order to avoid // 'chicken & egg' bootstrap problems searchMgr = new SearchManager( + getName(), context, config, itemStateMgr, persistMgr, Index: src/main/java/org/apache/jackrabbit/core/SearchManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/SearchManager.java (revision 1213661) +++ src/main/java/org/apache/jackrabbit/core/SearchManager.java (working copy) @@ -58,6 +58,8 @@ * Acts as a global entry point to execute queries and index nodes. */ public class SearchManager implements SynchronousEventListener { + + public static String JCR905_ENABLED = "JCR905"; /** * Logger instance for this class @@ -111,20 +113,21 @@ /** * Creates a new SearchManager. * - * @param config the search configuration. - * @param nsReg the namespace registry. - * @param ntReg the node type registry. - * @param itemMgr the shared item state manager. - * @param pm the underlying persistence manager. - * @param rootNodeId the id of the root node. - * @param parentMgr the parent search manager or null if - * there is no parent search manager. + * @param workspace the workspace name + * @param repositoryContext the repository context + * @param qhf the query handler factory + * @param itemMgr the shared item state manager. + * @param pm the underlying persistence manager. + * @param rootNodeId the id of the root node. + * @param parentMgr the parent search manager or null if + * there is no parent search manager. * @param excludedNodeId id of the node that should be excluded from * indexing. Any descendant of that node will also be * excluded from indexing. * @throws RepositoryException if the search manager cannot be initialized */ public SearchManager( + String workspace, RepositoryContext repositoryContext, QueryHandlerFactory qhf, SharedItemStateManager itemMgr, @@ -167,10 +170,9 @@ } // initialize query handler - this.handler = qhf.getQueryHandler(new QueryHandlerContext( - repositoryContext, - itemMgr, pm, rootNodeId, - parentHandler, excludedNodeId)); + this.handler = qhf.getQueryHandler(new QueryHandlerContext(workspace, + repositoryContext, itemMgr, pm, rootNodeId, parentHandler, + excludedNodeId)); } /** @@ -332,9 +334,12 @@ long type = e.getType(); if (type == Event.NODE_ADDED) { addedNodes.put(e.getChildId(), e); - // quick'n dirty fix for JCR-905 - if (e.isExternal()) { - removedNodes.add(e.getChildId()); + if (Boolean + .getBoolean(JCR905_ENABLED)) { + // quick'n dirty fix for JCR-905 + if (e.isExternal()) { + removedNodes.add(e.getChildId()); + } } if (e.isShareableChildNode()) { // simply re-index shareable nodes