Index: src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (revision 1688849) +++ src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (working copy) @@ -118,9 +118,11 @@ this.indexWorkDir = initializerWorkDir(indexRootDir); } - public Directory wrapForRead(String indexPath, IndexDefinition definition, Directory remote) throws IOException { + public Directory wrapForRead(String indexPath, IndexDefinition definition, + Directory remote) throws IOException { Directory local = createLocalDirForIndexReader(indexPath, definition); - return new CopyOnReadDirectory(remote, local); + return new CopyOnReadDirectory(remote, local, + definition.isPrefetchEnabledForCopyOnRead(), indexPath); } public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException { @@ -241,6 +243,7 @@ private class CopyOnReadDirectory extends FilterDirectory { private final Directory remote; private final Directory local; + private final String indexPath; private final ConcurrentMap files = newConcurrentMap(); /** @@ -249,11 +252,15 @@ */ private final Set localFileNames = Sets.newConcurrentHashSet(); - public CopyOnReadDirectory(Directory remote, Directory local) throws IOException { + public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch, String indexPath) throws IOException { super(remote); this.remote = remote; this.local = local; + this.indexPath = indexPath; this.localFileNames.addAll(Arrays.asList(local.listAll())); + if (prefetch) { + prefetchIndexFiles(); + } } @Override @@ -269,15 +276,20 @@ @Override public IndexInput openInput(String name, IOContext context) throws IOException { if (REMOTE_ONLY.contains(name)) { + log.trace("[{}] opening remote only file {}", indexPath, name); return remote.openInput(name, context); } CORFileReference ref = files.get(name); if (ref != null) { if (ref.isLocalValid()) { + log.trace("[{}] opening existing local file {}", indexPath, name); return files.get(name).openLocalInput(context); } else { readerRemoteReadCount.incrementAndGet(); + log.trace( + "[{}] opening existing remote file as local version is not valid {}", + indexPath, name); return remote.openInput(name, context); } } @@ -285,14 +297,17 @@ CORFileReference toPut = new CORFileReference(name); CORFileReference old = files.putIfAbsent(name, toPut); if (old == null) { + log.trace("[{}] scheduled local copy for {}", indexPath, name); copy(toPut); } //If immediate executor is used the result would be ready right away if (toPut.isLocalValid()) { + log.trace("[{}] opening new local file {}", indexPath, name); return toPut.openLocalInput(context); } + log.trace("[{}] opening new remote file {}", indexPath, name); readerRemoteReadCount.incrementAndGet(); return remote.openInput(name, context); } @@ -302,55 +317,88 @@ executor.execute(new Runnable() { @Override public void run() { - String name = reference.name; - boolean success = false; - boolean copyAttempted = false; - try { - scheduledForCopyCount.decrementAndGet(); - if (!local.fileExists(name)) { - long fileSize = remote.fileLength(name); - LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true); - long start = startCopy(file); - copyAttempted = true; + scheduledForCopyCount.decrementAndGet(); + copyFilesToLocal(reference, true); + } + }); + } - remote.copy(local, name, name, IOContext.READ); - reference.markValid(); + private void prefetchIndexFiles() throws IOException { + long start = PERF_LOGGER.start(); + long totalSize = 0; + int copyCount = 0; + for (String name : remote.listAll()) { + if (REMOTE_ONLY.contains(name)) { + continue; + } + CORFileReference fileRef = new CORFileReference(name); + files.putIfAbsent(name, fileRef); + long fileSize = copyFilesToLocal(fileRef, false); + if (fileSize > 0) { + copyCount++; + totalSize += fileSize; + } + } + PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}", indexPath, copyCount, humanReadableByteCount(totalSize)); + } - doneCopy(file, start); - } else { - long localLength = local.fileLength(name); - long remoteLength = remote.fileLength(name); + private long copyFilesToLocal(CORFileReference reference, boolean logDuration) { + String name = reference.name; + boolean success = false; + boolean copyAttempted = false; + long fileSize = 0; + try { + if (!local.fileExists(name)) { + long perfStart = -1; + if (logDuration) { + perfStart = PERF_LOGGER.start(); + } - //Do a simple consistency check. Ideally Lucene index files are never - //updated but still do a check if the copy is consistent - if (localLength != remoteLength) { - log.warn("Found local copy for {} in {} but size of local {} differs from remote {}. " + - "Content would be read from remote file only", - name, local, localLength, remoteLength); - invalidFileCount.incrementAndGet(); - } else { - reference.markValid(); - } + fileSize = remote.fileLength(name); + LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true); + long start = startCopy(file); + copyAttempted = true; + + remote.copy(local, name, name, IOContext.READ); + reference.markValid(); + + doneCopy(file, start); + if (logDuration) { + PERF_LOGGER.end(perfStart, 0, + "[{}] Copied file {} of size {}", indexPath, + name, humanReadableByteCount(fileSize)); + } + } else { + long localLength = local.fileLength(name); + long remoteLength = remote.fileLength(name); + + //Do a simple consistency check. Ideally Lucene index files are never + //updated but still do a check if the copy is consistent + if (localLength != remoteLength) { + log.warn("[{}] Found local copy for {} in {} but size of local {} differs from remote {}. Content would be read from remote file only", + indexPath, name, local, localLength, remoteLength); + invalidFileCount.incrementAndGet(); + } else { + reference.markValid(); + } + } + success = true; + } catch (IOException e) { + //TODO In case of exception there would not be any other attempt + //to download the file. Look into support for retry + log.warn("[{}] Error occurred while copying file [{}] from {} to {}", indexPath, name, remote, local, e); + } finally { + if (copyAttempted && !success){ + try { + if (local.fileExists(name)) { + local.deleteFile(name); } - success = true; } catch (IOException e) { - //TODO In case of exception there would not be any other attempt - //to download the file. Look into support for retry - log.warn("Error occurred while copying file [{}] " + - "from {} to {}", name, remote, local, e); - } finally { - if (copyAttempted && !success){ - try { - if (local.fileExists(name)) { - local.deleteFile(name); - } - } catch (IOException e) { - log.warn("Error occurred while deleting corrupted file [{}] from [{}]", name, local, e); - } - } + log.warn("[{}] Error occurred while deleting corrupted file [{}] from [{}]", indexPath, name, local, e); } } - }); + } + return fileSize; } /** @@ -381,8 +429,9 @@ try{ removeDeletedFiles(); } catch (IOException e) { - log.warn("Error occurred while removing deleted files from Local {}, " + - "Remote {}", local, remote, e); + log.warn( + "[{}] Error occurred while removing deleted files from Local {}, Remote {}", + indexPath, local, remote, e); } try { @@ -392,7 +441,9 @@ local.close(); remote.close(); } catch (IOException e) { - log.warn("Error occurred while closing directory ", e); + log.warn( + "[{}] Error occurred while closing directory ", + indexPath, e); } } }); Index: src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java (revision 1688821) +++ src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java (working copy) @@ -209,6 +209,8 @@ private final boolean saveDirListing; + private final boolean prefetchEnabled; + public IndexDefinition(NodeState root, NodeState defn) { this(root, defn, null); } @@ -273,6 +275,7 @@ this.pathFilter = PathFilter.from(new ReadOnlyBuilder(defn)); this.queryPaths = getQueryPaths(defn); this.saveDirListing = getOptionalValue(defn, LuceneIndexConstants.SAVE_DIR_LISTING, true); + this.prefetchEnabled = getOptionalValue(defn, LuceneIndexConstants.COR_PREFETCH_FILES, false); } public boolean isFullTextEnabled() { @@ -378,6 +381,10 @@ return saveDirListing; } + public boolean isPrefetchEnabledForCopyOnRead() { + return prefetchEnabled; + } + public PathFilter getPathFilter() { return pathFilter; } Index: src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java (revision 1688821) +++ src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java (working copy) @@ -270,4 +270,10 @@ * existing index files */ String INDEX_PATH = "indexPath"; + + /** + * Allow pre fetching the index files upon opening the directory when CopyOnRead is + * enabled + */ + String COR_PREFETCH_FILES = "enablePrefetchIndexFiles"; } Index: src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java (revision 1688821) +++ src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java (working copy) @@ -191,7 +191,7 @@ if (addOrUpdate(path, after, before.exists())) { long indexed = context.incIndexedNodes(); if (indexed % 1000 == 0) { - log.debug("{} => Indexed {} nodes...", context.getDefinition().getIndexName(), indexed); + log.debug("[{}] => Indexed {} nodes...", context.getDefinition().getIndexName(), indexed); } } } @@ -208,7 +208,7 @@ "Failed to close the Lucene index", e); } if (context.getIndexedNodes() > 0) { - log.debug("{} => Indexed {} nodes, done.", context.getDefinition().getIndexName(), context.getIndexedNodes()); + log.debug("[{}] => Indexed {} nodes, done.", context.getDefinition().getIndexName(), context.getIndexedNodes()); } } } @@ -286,8 +286,9 @@ try { Document d = makeDocument(path, state, isUpdate); if (d != null) { - if (log.isTraceEnabled()){ - log.trace("Indexed document for {} is {}", path, d); + if (log.isTraceEnabled()) { + log.trace("[{}] Indexed document for {} is {}", context + .getDefinition().getIndexName(), path, d); } context.indexUpdate(); context.getWriter().updateDocument(newPathTerm(path), d); @@ -464,8 +465,9 @@ // Ignore and warn if property multi-valued as not supported if (property.getType().isArray()) { log.warn( - "Ignoring ordered property {} of type {} for path {} as multivalued ordered property not supported", - pname, Type.fromTag(property.getType().tag(), true), getPath()); + "[{}] Ignoring ordered property {} of type {} for path {} as multivalued ordered property not supported", + context.getDefinition().getIndexName(), pname, + Type.fromTag(property.getType().tag(), true), getPath()); return false; } @@ -474,10 +476,11 @@ // Try converting type to the defined type in the index definition if (tag != idxDefinedTag) { log.debug( - "Ordered property defined with type {} differs from property {} with type {} in " + - "path {}", - Type.fromTag(idxDefinedTag, false), property.toString(), Type.fromTag(tag, false), - getPath()); + "[{}] Ordered property defined with type {} differs from property {} with type {} in " + + "path {}", + context.getDefinition().getIndexName(), + Type.fromTag(idxDefinedTag, false), property.toString(), + Type.fromTag(tag, false), getPath()); tag = idxDefinedTag; } @@ -508,10 +511,10 @@ } } catch (Exception e) { log.warn( - "Ignoring ordered property. Could not convert property {} of type {} to type " + - "{} for path {}", - pname, Type.fromTag(property.getType().tag(), false), - Type.fromTag(tag, false), getPath(), e); + "[{}] Ignoring ordered property. Could not convert property {} of type {} to type {} for path {}", + context.getDefinition().getIndexName(), pname, + Type.fromTag(property.getType().tag(), false), + Type.fromTag(tag, false), getPath(), e); } return fieldAdded; } @@ -528,9 +531,10 @@ //jcr:mimeType is mandatory for a binary to be indexed String type = state.getString(JcrConstants.JCR_MIMETYPE); - if (type == null || !isSupportedMediaType(type)){ - log.trace("Ignoring binary content for node {} due to unsupported " + - "(or null) jcr:mimeType [{}]", nodePath, type); + if (type == null || !isSupportedMediaType(type)) { + log.trace( + "[{}] Ignoring binary content for node {} due to unsupported (or null) jcr:mimeType [{}]", + context.getDefinition().getIndexName(), nodePath, type); return fields; } @@ -829,11 +833,12 @@ // Capture and report any other full text extraction problems. // The special STOP exception is used for normal termination. if (!handler.isWriteLimitReached(t)) { - log.debug("Failed to extract text from a binary property: " - + path + log.debug( + "[{}] Failed to extract text from a binary property: {}." + " This is a fairly common case, and nothing to" + " worry about. The stack trace is included to" - + " help improve the text extraction feature.", t); + + " help improve the text extraction feature.", + context.getDefinition().getIndexName(), path, t); return "TextExtractionError"; } } Index: src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (revision 1688821) +++ src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (working copy) @@ -111,6 +111,34 @@ } @Test + public void basicTestWithPrefetch() throws Exception{ + Directory baseDir = new RAMDirectory(); + builder.setProperty(LuceneIndexConstants.COR_PREFETCH_FILES, true); + IndexDefinition defn = new IndexDefinition(root, builder.getNodeState()); + IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir()); + + Directory remote = new RAMDirectory(); + + byte[] t1 = writeFile(remote , "t1"); + byte[] t2 = writeFile(remote , "t2"); + + Directory wrapped = c1.wrapForRead("/foo", defn, remote); + assertEquals(2, wrapped.listAll().length); + + assertTrue(wrapped.fileExists("t1")); + assertTrue(wrapped.fileExists("t2")); + + assertTrue(baseDir.fileExists("t1")); + assertTrue(baseDir.fileExists("t2")); + + assertEquals(t1.length, wrapped.fileLength("t1")); + assertEquals(t2.length, wrapped.fileLength("t2")); + + readAndAssert(wrapped, "t1", t1); + + } + + @Test public void basicTestWithFS() throws Exception{ IndexDefinition defn = new IndexDefinition(root, builder.getNodeState()); IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());