Index: lucene/replicator/build.xml =================================================================== --- lucene/replicator/build.xml (revision 0) +++ lucene/replicator/build.xml (working copy) @@ -0,0 +1,49 @@ + + + + + + Files replication utility + + + + + + + + + + + + + + + + + + + + + + + + + + + + Property changes on: lucene/replicator/build.xml ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/ivy.xml =================================================================== --- lucene/replicator/ivy.xml (revision 0) +++ lucene/replicator/ivy.xml (working copy) @@ -0,0 +1,50 @@ + + +]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Property changes on: lucene/replicator/ivy.xml ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/lib =================================================================== --- lucene/replicator/lib (revision 0) +++ lucene/replicator/lib (working copy) Property changes on: lucene/replicator/lib ___________________________________________________________________ Added: svn:ignore ## -0,0 +1 ## +*.jar Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java (working copy) @@ -0,0 +1,200 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.NoMergeScheduler; +import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Version; + +/** + * A {@link ReplicationHandler} for replication of an index and taxonomy pair. + * See {@link IndexReplicationHandler} for more detail, but this handler ensures + * that the search and taxonomy indexes are replicated in a consistent way. + * Taxonomy indexes that recreated either via {@link OpenMode#CREATE} or + * {@link DirectoryTaxonomyWriter#replaceTaxonomy(Directory)} are not supported + * by this handler. + * + * @see IndexReplicationHandler + * + * @lucene.experimental + */ +public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler { + + private final Directory indexDir; + private final Directory taxoDir; + private final Callable callback; + + private long taxoEpoch; + private Map currentRevisionFiles; + private String currentVersion; + + /** + * Constructor with the given index directory and callback to notify when the + * indexes were updated. + */ + public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable callback) + throws IOException { + final boolean indexExists = DirectoryReader.indexExists(indexDir); + final boolean taxoExists = DirectoryReader.indexExists(taxoDir); + if (!indexExists && !taxoExists) { + currentRevisionFiles = null; + currentVersion = null; + taxoEpoch = -1; + } else if (indexExists != taxoExists) { + throw new IllegalArgumentException("both search and taxonomy indexes must exist: indexExists=" + indexExists + + " taxoExists=" + taxoExists); + } else { + List commits = DirectoryReader.listCommits(indexDir); + IndexCommit indexCommit = commits.get(commits.size() - 1); + commits = DirectoryReader.listCommits(taxoDir); + IndexCommit taxoCommit = commits.get(commits.size() - 1); + currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit); + currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit); + taxoEpoch = Long.parseLong(taxoCommit.getUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)); + } + this.callback = callback; + this.indexDir = indexDir; + this.taxoDir = taxoDir; + } + + @Override + public String currentVersion() { + return currentVersion; + } + + @Override + public Map currentRevisionFiles() { + return currentRevisionFiles; + } + + private List processFiles(RevisionFile[] files, Directory clientDir, Directory targetDir) + throws IOException { + List toSync = new ArrayList(); + if (clientDir.equals(targetDir)) { + // files copied directly to index directory, sync all revision files in + // the order specified by revisionFiles. + for (RevisionFile file : files) { + toSync.add(file.fileName); + } + } else { + // files were copied to a transient directory. copy them to the index + // directory and sync, but do so in the same order they are given. + for (RevisionFile file : files) { + if (clientDir.fileExists(file.fileName)) { + clientDir.copy(targetDir, file.fileName, file.fileName, IOContext.READONCE); + toSync.add(file.fileName); + } + } + } + + // make sure that the last file to sync is the segments_N file and fail if + // not. the reason why the code fails instead of putting segments_N file + // last is that this indicates an error in the Revision implementation. + if (!toSync.isEmpty()) { + String lastFile = toSync.get(toSync.size() - 1); + if (!lastFile.startsWith(IndexFileNames.SEGMENTS) || lastFile.equals(IndexFileNames.SEGMENTS_GEN)) { + throw new IllegalStateException("last file to copy+sync must be segments_N but got " + lastFile + + "; check your Revision implementation!"); + } + } + return toSync; + } + + @Override + public void revisionReady(String version, Map revisionFiles, + Map sourceDirectory) throws IOException { + // copy taxonomy index files first. If anything goes wrong between this line + // and the next, it is ok if the taxonomy index is more 'advanced' than the + // search index, as it only means it contains more categories. + List taxoFiles = processFiles(revisionFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE), + sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE), taxoDir); + List indexFiles = processFiles(revisionFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE), + sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE), indexDir); + + // sync all copied files in the order they are given except the segments_N + // file. We sync both the taxonomy and index segments files in the end, so + // the sync is more close to atomically. + indexDir.sync(indexFiles.subList(0, indexFiles.size() - 1)); + if (!taxoFiles.isEmpty()) { + taxoDir.sync(taxoFiles.subList(0, taxoFiles.size() - 1)); + } + + // sync taxo segments before index segments because if is ok if the taxonomy + // contains more categories than the search index knows about, but not ok + // the other way around. That way, if the process crashes after taxoSegments + // was synced, but before indexSegments, a searcher-taxonomy pair will be + // valid. + if (!taxoFiles.isEmpty()) { + taxoDir.sync(Collections.singletonList(taxoFiles.get(taxoFiles.size() - 1))); + } + indexDir.sync(Collections.singletonList(indexFiles.get(indexFiles.size() - 1))); + + // verify that the replicated taxonomy index is of the same epoch as the one + // this handler was created with + List commits = DirectoryReader.listCommits(taxoDir); + IndexCommit ic = commits.get(commits.size() - 1); + long replicatedEpoch = Long.parseLong(ic.getUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)); + if (taxoEpoch == -1) { + // handler was created when taxonomy index didn't exist, so just assign + // the epoch for the first time. + taxoEpoch = replicatedEpoch; + } else if (taxoEpoch != replicatedEpoch) { + throw new IllegalStateException("handler created with taxonomy epoch " + taxoEpoch + + " however replicated taxonomy epoch is " + replicatedEpoch + "; this is not supported!"); + } + + // touch the taxonomy index so that it reads the latest commit point and deletes any unused files + IndexWriterConfig taxoConf = new IndexWriterConfig(Version.LUCENE_50, null); + taxoConf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening + new IndexWriter(taxoDir, taxoConf).close(); + + // touch the index so that it reads the latest commit point and deletes any unused files + IndexWriterConfig indexConf = new IndexWriterConfig(Version.LUCENE_50, null); + indexConf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening + new IndexWriter(indexDir, indexConf).close(); + + currentVersion = version; + currentRevisionFiles = revisionFiles; + + // successfully updated the index, notify the callback that the index is ready. + if (callback != null) { + try { + callback.call(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java (working copy) @@ -0,0 +1,226 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter; +import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +/** + * A {@link Revision} of a single index and taxonomy index files which comprises + * the list of files from both indexes. This revision should be used whenever a + * pair of search and taxonomy indexes need to be replicated together to + * guarantee consistency of both on the replicating (client) side. + * + * @see IndexRevision + * + * @lucene.experimental + */ +public class IndexAndTaxonomyRevision implements Revision { + + /** + * A {@link DirectoryTaxonomyWriter} which sets the underlying + * {@link IndexWriter}'s {@link IndexDeletionPolicy} to + * {@link SnapshotDeletionPolicy}. + */ + public static final class SnapshotDirectoryTaxonomyWriter extends DirectoryTaxonomyWriter { + + private SnapshotDeletionPolicy sdp; + private IndexWriter writer; + + /** + * @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory, + * IndexWriterConfig.OpenMode, TaxonomyWriterCache) + */ + public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode, TaxonomyWriterCache cache) + throws IOException { + super(directory, openMode, cache); + } + + /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory, IndexWriterConfig.OpenMode) */ + public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode) throws IOException { + super(directory, openMode); + } + + /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory) */ + public SnapshotDirectoryTaxonomyWriter(Directory d) throws IOException { + super(d); + } + + @Override + protected IndexWriterConfig createIndexWriterConfig(OpenMode openMode) { + IndexWriterConfig conf = super.createIndexWriterConfig(openMode); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + return conf; + } + + @Override + protected IndexWriter openIndexWriter(Directory directory, IndexWriterConfig config) throws IOException { + writer = super.openIndexWriter(directory, config); + // must set it here because IndexWriter clones the config + sdp = (SnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy(); + return writer; + } + + /** Returns the {@link SnapshotDeletionPolicy} used by the underlying {@link IndexWriter}. */ + public SnapshotDeletionPolicy getDeletionPolicy() { + return sdp; + } + + /** Returns the {@link IndexWriter} used by this {@link DirectoryTaxonomyWriter}. */ + public IndexWriter getIndexWriter() { + return writer; + } + + } + + private static final int RADIX = 16; + + public static final String INDEX_SOURCE = "index"; + public static final String TAXONOMY_SOURCE = "taxo"; + + private final IndexWriter indexWriter; + private final SnapshotDirectoryTaxonomyWriter taxoWriter; + private final IndexCommit indexCommit, taxoCommit; + private final SnapshotDeletionPolicy indexSDP, taxoSDP; + private final String version; + private final Map sourceFiles; + + /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */ + public static Map revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit) + throws IOException { + HashMap files = new HashMap(); + files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next()); + files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next()); + return files; + } + + /** + * Returns a String representation of a revision's version from the given + * {@link IndexCommit}s of the search and taxonomy indexes. + */ + public static String revisionVersion(IndexCommit indexCommit, IndexCommit taxoCommit) { + return Long.toString(indexCommit.getGeneration(), RADIX) + ":" + Long.toString(taxoCommit.getGeneration(), RADIX); + } + + /** + * Constructor over the given {@link IndexWriter}. Uses the last + * {@link IndexCommit} found in the {@link Directory} managed by the given + * writer. + */ + public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxoWriter) + throws IOException { + IndexDeletionPolicy delPolicy = indexWriter.getConfig().getIndexDeletionPolicy(); + if (!(delPolicy instanceof SnapshotDeletionPolicy)) { + throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy"); + } + this.indexWriter = indexWriter; + this.taxoWriter = taxoWriter; + this.indexSDP = (SnapshotDeletionPolicy) delPolicy; + this.taxoSDP = taxoWriter.getDeletionPolicy(); + this.indexCommit = indexSDP.snapshot(); + this.taxoCommit = taxoSDP.snapshot(); + this.version = revisionVersion(indexCommit, taxoCommit); + this.sourceFiles = revisionFiles(indexCommit, taxoCommit); + } + + @Override + public int compareTo(String version) { + final String[] parts = version.split(":"); + final long indexGen = Long.parseLong(parts[0], RADIX); + final long taxoGen = Long.parseLong(parts[1], RADIX); + final long indexCommitGen = indexCommit.getGeneration(); + final long taxoCommitGen = taxoCommit.getGeneration(); + + // if the index generation is not the same as this commit's generation, + // compare by it. Otherwise, compare by the taxonomy generation. + if (indexCommitGen < indexGen) { + return -1; + } else if (indexCommitGen > indexGen) { + return 1; + } else { + return taxoCommitGen < taxoGen ? -1 : (taxoCommitGen > taxoGen ? 1 : 0); + } + } + + @Override + public int compareTo(Revision o) { + IndexAndTaxonomyRevision other = (IndexAndTaxonomyRevision) o; + int cmp = indexCommit.compareTo(other.indexCommit); + return cmp != 0 ? cmp : taxoCommit.compareTo(other.taxoCommit); + } + + @Override + public String getVersion() { + return version; + } + + @Override + public Map getSourceFiles() { + return sourceFiles; + } + + @Override + public InputStream open(String source, String fileName) throws IOException { + assert source.equals(INDEX_SOURCE) || source.equals(TAXONOMY_SOURCE) : "invalid source; expected=(" + INDEX_SOURCE + + " or " + TAXONOMY_SOURCE + ") got=" + source; + IndexCommit ic = source.equals(INDEX_SOURCE) ? indexCommit : taxoCommit; + return new IndexInputInputStream(ic.getDirectory().openInput(fileName, IOContext.READONCE)); + } + + @Override + public void release() throws IOException { + try { + indexSDP.release(indexCommit); + } finally { + taxoSDP.release(taxoCommit); + } + + try { + indexWriter.deleteUnusedFiles(); + } finally { + taxoWriter.getIndexWriter().deleteUnusedFiles(); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("IndexAndTaxonomyRevision [version=").append(version).append(" sourceFiles={"); + for (Entry e : sourceFiles.entrySet()) { + sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';'); + } + sb.append('}'); + return sb.toString(); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java (working copy) @@ -0,0 +1,92 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.lucene.store.IndexInput; + +/** + * An {@link InputStream} which wraps an {@link IndexInput}. + * + * @lucene.experimental + */ +public final class IndexInputInputStream extends InputStream { + + private final IndexInput in; + + private long remaining; + + public IndexInputInputStream(IndexInput in) { + this.in = in; + remaining = in.length(); + } + + @Override + public int read() throws IOException { + if (remaining == 0) { + return -1; + } else { + --remaining; + return in.readByte(); + } + } + + @Override + public int available() throws IOException { + return (int) in.length(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (remaining == 0) { + return -1; + } + if (remaining < len) { + len = (int) remaining; + } + in.readBytes(b, off, len); + remaining -= len; + return len; + } + + @Override + public long skip(long n) throws IOException { + if (remaining == 0) { + return -1; + } + if (remaining < n) { + n = remaining; + } + in.seek(in.getFilePointer() + n); + remaining -= n; + return n; + } + +} \ No newline at end of file Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java (working copy) @@ -0,0 +1,151 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergeScheduler; +import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Version; + +/** + * A {@link ReplicationHandler} for replication of an index. Implements + * {@link #revisionReady} by copying the files pointed by the client resolver to + * the index {@link Directory} and then touches the index with + * {@link IndexWriter} to make sure any unused files are deleted. + *

+ * NOTE: assumes that {@link IndexWriter} is not opened by another + * process on the index directory. In fact, opening an {@link IndexWriter} on + * the same directory to which files are copied can lead to undefined behavior, + * where some or all the files will be deleted, override other files or simply + * create a mess. When you replicate an index, it is best if the index is never + * modified by {@link IndexWriter}, except the one that is open on the source + * index, from which you replicate. + *

+ * This handler notifies the application via a provided {@link Callable} when an + * updated index commit was made available for it. + * + * @lucene.experimental + */ +public class IndexReplicationHandler implements ReplicationHandler { + + private final Directory indexDir; + private final Callable callback; + + private Map currentRevisionFiles; + private String currentVersion; + + /** + * Constructor with the given index directory and callback to notify when the + * indexes were updated. + */ + public IndexReplicationHandler(Directory indexDir, Callable callback) throws IOException { + this.callback = callback; + this.indexDir = indexDir; + if (!DirectoryReader.indexExists(indexDir)) { + currentRevisionFiles = null; + currentVersion = null; + } else { + List commits = DirectoryReader.listCommits(indexDir); + IndexCommit commit = commits.get(commits.size() - 1); + currentRevisionFiles = IndexRevision.revisionFiles(commit); + currentVersion = IndexRevision.revisionVersion(commit); + } + } + + @Override + public String currentVersion() { + return currentVersion; + } + + @Override + public Map currentRevisionFiles() { + return currentRevisionFiles; + } + + @Override + public void revisionReady(String version, Map revisionFiles, + Map sourceDirectory) throws IOException { + if (revisionFiles.size() > 1) { + throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet()); + } + + Entry entry = revisionFiles.entrySet().iterator().next(); + Directory clientDir = sourceDirectory.get(entry.getKey()); + List toSync = new ArrayList(); + if (clientDir.equals(indexDir)) { + // files copied directly to index directory, sync all revision files in + // the order specified by revisionFiles. + for (RevisionFile file : entry.getValue()) { + toSync.add(file.fileName); + } + } else { + // files were copied to a transient directory. copy them to the index + // directory and sync, but do so in the order specified by revisionFiles. + for (RevisionFile file : entry.getValue()) { + if (clientDir.fileExists(file.fileName)) { + clientDir.copy(indexDir, file.fileName, file.fileName, IOContext.READONCE); + toSync.add(file.fileName); + } + } + } + + // sync all copied files. the sync happens in order, and IndexRevision + // ensures that the last file sent is the segments_N file. Still, verify + // that it's indeed the case. The reason why the code fails instead of + // putting segments_N file last is that this indicates an error in the + // Revision implementation. + String lastFile = toSync.get(toSync.size() - 1); + if (!lastFile.startsWith(IndexFileNames.SEGMENTS) || lastFile.equals(IndexFileNames.SEGMENTS_GEN)) { + throw new IllegalStateException("last file to copy+sync must be segments_N but got " + lastFile + + "; check your Revision implementation!"); + } + indexDir.sync(toSync); + + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_50, null); + conf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening + + // touch the index so that it reads the latest commit point and deletes any unused files + new IndexWriter(indexDir, conf).close(); + + currentVersion = version; + currentRevisionFiles = revisionFiles; + + // successfully updated the index, notify the callback that the index is ready. + if (callback != null) { + try { + callback.call(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java (working copy) @@ -0,0 +1,157 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +/** + * A {@link Revision} of a single index files which comprises the list of files + * that are part of the current {@link IndexCommit}. To ensure the files are not + * deleted by {@link IndexWriter} for as long as this revision stays alive (i.e. + * until {@link #release()}), the current commit point is snapshotted, using + * {@link SnapshotDeletionPolicy} (this means that the given writer's + * {@link IndexWriterConfig#getIndexDeletionPolicy() config} should return + * {@link SnapshotDeletionPolicy}). + *

+ * When this revision is {@link #release() released}, it releases the obtained + * snapshot as well as calls {@link IndexWriter#deleteUnusedFiles()} so that the + * snapshotted files are deleted (if they are no longer needed). + * + * @lucene.experimental + */ +public class IndexRevision implements Revision { + + private static final int RADIX = 16; + private static final String SOURCE = "index"; + + private final IndexWriter writer; + private final IndexCommit commit; + private final SnapshotDeletionPolicy sdp; + private final String version; + private final Map sourceFiles; + + // returns a RevisionFile with some metadata + private static RevisionFile newRevisionFile(String file, Directory dir) throws IOException { + RevisionFile revFile = new RevisionFile(file); + revFile.size = dir.fileLength(file); + return revFile; + } + + /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */ + public static Map revisionFiles(IndexCommit commit) throws IOException { + Collection commitFiles = commit.getFileNames(); + RevisionFile[] revisionFiles = new RevisionFile[commitFiles.size()]; + String segmentsFile = commit.getSegmentsFileName(); + Directory dir = commit.getDirectory(); + + revisionFiles[revisionFiles.length - 1] = newRevisionFile(segmentsFile, dir); // segments_N must be last + int idx = 0; + for (String file : commitFiles) { + if (!file.equals(segmentsFile)) { + revisionFiles[idx++] = newRevisionFile(file, dir); + } + } + return Collections.singletonMap(SOURCE, revisionFiles); + } + + /** + * Returns a String representation of a revision's version from the given + * {@link IndexCommit}. + */ + public static String revisionVersion(IndexCommit commit) { + return Long.toString(commit.getGeneration(), RADIX); + } + + /** + * Constructor over the given {@link IndexWriter}. Uses the last + * {@link IndexCommit} found in the {@link Directory} managed by the given + * writer. + */ + public IndexRevision(IndexWriter writer) throws IOException { + IndexDeletionPolicy delPolicy = writer.getConfig().getIndexDeletionPolicy(); + if (!(delPolicy instanceof SnapshotDeletionPolicy)) { + throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy"); + } + this.writer = writer; + this.sdp = (SnapshotDeletionPolicy) delPolicy; + this.commit = sdp.snapshot(); + this.version = revisionVersion(commit); + this.sourceFiles = revisionFiles(commit); + } + + @Override + public int compareTo(String version) { + long gen = Long.parseLong(version, RADIX); + long commitGen = commit.getGeneration(); + return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0); + } + + @Override + public int compareTo(Revision o) { + IndexRevision other = (IndexRevision) o; + return commit.compareTo(other.commit); + } + + @Override + public String getVersion() { + return version; + } + + @Override + public Map getSourceFiles() { + return sourceFiles; + } + + @Override + public InputStream open(String source, String fileName) throws IOException { + assert source.equals(SOURCE) : "invalid source; expected=" + SOURCE + " got=" + source; + return new IndexInputInputStream(commit.getDirectory().openInput(fileName, IOContext.READONCE)); + } + + @Override + public void release() throws IOException { + sdp.release(commit); + writer.deleteUnusedFiles(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("IndexRevision [version=").append(version).append(" sourceFiles={"); + for (Entry e : sourceFiles.entrySet()) { + sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';'); + } + sb.append('}'); + return sb.toString(); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java (working copy) @@ -0,0 +1,247 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.store.AlreadyClosedException; + +/** + * A {@link Replicator} implementation for use by the side that publishes + * {@link Revision}s, as well for clients to {@link #checkForUpdate(String) + * check for updates}. When a client needs to be updated, it is returned a + * {@link SessionToken} through which it can + * {@link #obtainFile(String, String, String) obtain} the files of that + * revision. As long as a revision is being replicated, this replicator + * guarantees that it will not be {@link Revision#release() released}. + *

+ * Replication sessions expire by default after + * {@link #DEFAULT_SESSION_EXPIRATION_THRESHOLD}, and the threshold can be + * configured through {@link #setExpirationThreshold(long)}. + * + * @lucene.experimental + */ +public class LocalReplicator implements Replicator { + + private static class RefCountedRevision { + private final AtomicInteger refCount = new AtomicInteger(1); + public final Revision revision; + + public RefCountedRevision(Revision revision) { + this.revision = revision; + } + + public void decRef() throws IOException { + if (refCount.get() <= 0) { + throw new IllegalStateException("this revision is already released"); + } + + final int rc = refCount.decrementAndGet(); + if (rc == 0) { + boolean success = false; + try { + revision.release(); + success = true; + } finally { + if (!success) { + // Put reference back on failure + refCount.incrementAndGet(); + } + } + } else if (rc < 0) { + throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement"); + } + } + + public void incRef() { + refCount.incrementAndGet(); + } + + } + + private static class ReplicationSession { + public final SessionToken session; + public final RefCountedRevision revision; + private volatile long lastAccessTime; + + ReplicationSession(SessionToken session, RefCountedRevision revision) { + this.session = session; + this.revision = revision; + lastAccessTime = System.currentTimeMillis(); + } + + boolean isExpired(long expirationThreshold) { + return lastAccessTime < (System.currentTimeMillis() - expirationThreshold); + } + + void markAccessed() { + lastAccessTime = System.currentTimeMillis(); + } + } + + /** Threshold for expiring inactive sessions. Defaults to 30 minutes. */ + public static final long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30; + + private long expirationThresholdMilllis = LocalReplicator.DEFAULT_SESSION_EXPIRATION_THRESHOLD; + + private volatile RefCountedRevision currentRevision; + private volatile boolean closed = false; + + private final AtomicInteger sessionToken = new AtomicInteger(0); + private final Map sessions = new HashMap(); + + private void checkExpiredSessions() throws IOException { + // make a "to-delete" list so we don't risk deleting from the map while iterating it + final ArrayList toExpire = new ArrayList(); + for (ReplicationSession token : sessions.values()) { + if (token.isExpired(expirationThresholdMilllis)) { + toExpire.add(token); + } + } + for (ReplicationSession token : toExpire) { + releaseSession(token.session.id); + } + } + + private void releaseSession(String sessionID) throws IOException { + ReplicationSession session = sessions.remove(sessionID); + // if we're called concurrently by close() and release(), could be that one + // thread beats the other to release the session. + if (session != null) { + session.revision.decRef(); + } + } + + /** Ensure that replicator is still open, or throw {@link AlreadyClosedException} otherwise. */ + protected final synchronized void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("This replicator has already been closed"); + } + } + + @Override + public synchronized SessionToken checkForUpdate(String currentVersion) { + ensureOpen(); + if (currentRevision == null) { // no published revisions yet + return null; + } + + if (currentVersion != null && currentRevision.revision.compareTo(currentVersion) <= 0) { + // currentVersion is newer or equal to latest published revision + return null; + } + + // currentVersion is either null or older than latest published revision + currentRevision.incRef(); + final String sessionID = Integer.toString(sessionToken.incrementAndGet()); + final SessionToken sessionToken = new SessionToken(sessionID, currentRevision.revision); + final ReplicationSession timedSessionToken = new ReplicationSession(sessionToken, currentRevision); + sessions.put(sessionID, timedSessionToken); + return sessionToken; + } + + @Override + public synchronized void close() throws IOException { + if (!closed) { + // release all managed revisions + for (ReplicationSession session : sessions.values()) { + session.revision.decRef(); + } + sessions.clear(); + closed = true; + } + } + + /** + * Returns the expiration threshold. + * + * @see #setExpirationThreshold(long) + */ + public long getExpirationThreshold() { + return expirationThresholdMilllis; + } + + @Override + public synchronized InputStream obtainFile(String sessionID, String source, String fileName) throws IOException { + ensureOpen(); + ReplicationSession session = sessions.get(sessionID); + if (session != null && session.isExpired(expirationThresholdMilllis)) { + releaseSession(sessionID); + session = null; + } + // session either previously expired, or we just expired it + if (session == null) { + throw new SessionExpiredException("session (" + sessionID + ") expired while obtaining file: source=" + source + + " file=" + fileName); + } + sessions.get(sessionID).markAccessed(); + return session.revision.revision.open(source, fileName); + } + + @Override + public synchronized void publish(Revision revision) throws IOException { + ensureOpen(); + if (currentRevision != null) { + int compare = revision.compareTo(currentRevision.revision); + if (compare == 0) { + // same revision published again, ignore but release it + revision.release(); + return; + } + + if (compare < 0) { + revision.release(); + throw new IllegalArgumentException("Cannot publish an older revision: rev=" + revision + " current=" + + currentRevision); + } + } + + // swap revisions + final RefCountedRevision oldRevision = currentRevision; + currentRevision = new RefCountedRevision(revision); + if (oldRevision != null) { + oldRevision.decRef(); + } + + // check for expired sessions + checkExpiredSessions(); + } + + @Override + public synchronized void release(String sessionID) throws IOException { + ensureOpen(); + releaseSession(sessionID); + } + + /** + * Modify session expiration time - if a replication session is inactive that + * long it is automatically expired, and further attempts to operate within + * this session will throw a {@link SessionExpiredException}. + */ + public synchronized void setExpirationThreshold(long expirationThreshold) throws IOException { + ensureOpen(); + this.expirationThresholdMilllis = expirationThreshold; + checkExpiredSessions(); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java (working copy) @@ -0,0 +1,77 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; + +import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; + +/** + * A {@link SourceDirectoryFactory} which returns {@link FSDirectory} under a + * dedicated session directory. When a session is over, the entire directory is + * deleted. + * + * @lucene.experimental + */ +public class PerSessionDirectoryFactory implements SourceDirectoryFactory { + + private final File workDir; + + /** Constructor with the given sources mapping. */ + public PerSessionDirectoryFactory(File workDir) { + this.workDir = workDir; + } + + private void rm(File file) throws IOException { + if (file.isDirectory()) { + for (File f : file.listFiles()) { + rm(f); + } + } + + // This should be either an empty directory, or a file + if (!file.delete() && file.exists()) { + throw new IOException("failed to delete " + file); + } + } + + @Override + public Directory getDirectory(String sessionID, String source) throws IOException { + File sessionDir = new File(workDir, sessionID); + if (!sessionDir.exists() && !sessionDir.mkdirs()) { + throw new IOException("failed to create session directory " + sessionDir); + } + File sourceDir = new File(sessionDir, source); + if (!sourceDir.mkdirs()) { + throw new IOException("failed to create source directory " + sourceDir); + } + return FSDirectory.open(sourceDir); + } + + @Override + public void cleanupSession(String sessionID) throws IOException { + if (sessionID.isEmpty()) { // protect against deleting workDir entirely! + throw new IllegalArgumentException("sessionID cannot be empty"); + } + rm(new File(workDir, sessionID)); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (working copy) @@ -0,0 +1,371 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A client which monitors and obtains new revisions from a {@link Replicator}. + * It can be used to either periodically check for updates by invoking + * {@link #startUpdateThread}, or manually by calling {@link #updateNow()}. + *

+ * Whenever a new revision is available, the {@link #requiredFiles(Map)} are + * copied to the {@link Directory} specified by {@link PerSessionDirectoryFactory} and + * a handler is notified. + * + * @lucene.experimental + */ +public class ReplicationClient implements Closeable { + + /** Handler for revisions obtained by the client. */ + public static interface ReplicationHandler { + + /** Returns the current revision version held by the handler. */ + public String currentVersion(); + + /** Returns the current revision files held by the handler. */ + public Map currentRevisionFiles(); + + /** + * Called when a new revision was obtained and is available (i.e. all needed + * files were successfully copied). + */ + public void revisionReady(String version, Map revisionFiles, + Map sourceDirectory) throws IOException; + } + + /** + * Resolves a session and source into a {@link Directory} to use for copying + * the session files to. + */ + public static interface SourceDirectoryFactory { + + /** + * Returns the {@link Directory} to use for the given session and source. + * Implementations may e.g. return different directories for different + * sessions, or the same directory for all sessions. In that case, it is + * advised to clean the directory before it is used for a new session. + * + * @see #cleanupSession(String) + */ + public Directory getDirectory(String sessionID, String source) throws IOException; + + /** + * Called to denote that the replication actions for this session were finished and the directory is no longer needed. + */ + public void cleanupSession(String sessionID) throws IOException; + + } + + private class ReplicationThread extends Thread { + + private final long interval; + + // client uses this to stop us + final CountDownLatch stop = new CountDownLatch(1); + + public ReplicationThread(long interval) { + this.interval = interval; + } + + @SuppressWarnings("synthetic-access") + @Override + public void run() { + while (true) { + long time = System.currentTimeMillis(); + updateLock.lock(); + try { + doUpdate(); + } catch (Throwable t) { + handleUpdateException(t); + } finally { + updateLock.unlock(); + } + time = System.currentTimeMillis() - time; + + // adjust timeout to compensate the time spent doing the replication. + final long timeout = interval - time; + if (timeout > 0) { + try { + // this will return immediately if we were ordered to stop (count=0) + // or the timeout has elapsed. if it returns true, it means count=0, + // so terminate. + if (stop.await(timeout, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + // if we were interruted, somebody wants to terminate us, so just + // throw the exception further. + Thread.currentThread().interrupt(); + throw new ThreadInterruptedException(e); + } + } + } + } + + } + + /** The component name to use with {@link InfoStream#isEnabled(String)}. */ + public static final String INFO_STREAM_COMPONENT = "ReplicationThread"; + + private final Replicator replicator; + private final ReplicationHandler handler; + private final SourceDirectoryFactory factory; + private final byte[] copyBuffer = new byte[16384]; + private final Lock updateLock = new ReentrantLock(); + + private volatile ReplicationThread updateThread; + private volatile boolean closed = false; + + /** + * Constructor. + * + * @param replicator the {@link Replicator} used for checking for updates + * @param handler notified when new revisions are ready + * @param factory returns a {@link Directory} for a given source and session + */ + public ReplicationClient(Replicator replicator, ReplicationHandler handler, SourceDirectoryFactory factory) { + this.replicator = replicator; + this.handler = handler; + this.factory = factory; + } + + private void copyBytes(IndexOutput out, InputStream in) throws IOException { + int numBytes; + while ((numBytes = in.read(copyBuffer)) > 0) { + out.writeBytes(copyBuffer, 0, numBytes); + } + } + + /** Throws {@link AlreadyClosedException} if the client has already been closed. */ + protected final void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("this update client has already been closed"); + } + } + + private void doUpdate() throws IOException { + SessionToken session = null; + final Map sourceDirectory = new HashMap(); + boolean notify = false; + try { + final String version = handler.currentVersion(); + session = replicator.checkForUpdate(version); + if (session == null) { + // already up to date + return; + } + for (Entry e : requiredFiles(session.sourceFiles).entrySet()) { + String source = e.getKey(); + Directory dir = factory.getDirectory(session.id, source); + sourceDirectory.put(source, dir); + for (RevisionFile file : e.getValue()) { + if (closed) { + // we're either closed, or the update thread was stopped - abort files copy + return; + } + InputStream in = null; + IndexOutput out = null; + try { + in = replicator.obtainFile(session.id, source, file.fileName); + out = dir.createOutput(file.fileName, IOContext.DEFAULT); + copyBytes(out, in); + // TODO add some validation, on size / checksum + } finally { + IOUtils.close(in, out); + } + } + } + // only notify if all needed files were successfully obtained. + notify = true; + } finally { + if (session != null) { + try { + replicator.release(session.id); + } finally { + if (!notify) { // cleanup after ourselves + IOUtils.close(sourceDirectory.values()); + factory.cleanupSession(session.id); + } + } + } + } + + // notify outside the try-finally above, so the session is released sooner. + // the handler may take time to finish acting on the copied files, but the + // session itself is no longer needed. + try { + if (notify && !closed ) { // no use to notify if we are closed already + handler.revisionReady(session.version, session.sourceFiles, sourceDirectory); + } + } finally { + IOUtils.close(sourceDirectory.values()); + if (session != null) { + factory.cleanupSession(session.id); + } + } + } + + /** + * Start the update thread with the specified interval in milliseconds. For + * debugging purposes, you can optionally set the name to set on + * {@link Thread#setName(String)}. If you pass {@code null}, a default name + * will be set. + * + * @throws IllegalStateException if the thread has already been started + */ + public synchronized void startUpdateThread(long intervalMillis, String threadName) { + ensureOpen(); + if (updateThread != null) { + throw new IllegalStateException( + "cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); + } + threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; + updateThread = new ReplicationThread(intervalMillis); + updateThread.setName(threadName); + updateThread.start(); + } + + /** + * Stop the update thread. If the update thread is not running, silently does + * nothing. This method returns after the update thread has stopped. + */ + public synchronized void stopUpdateThread() { + if (updateThread != null) { + // this will trigger the thread to terminate if it awaits the lock. + // otherwise, if it's in the middle of replication, we wait for it to + // stop. + updateThread.stop.countDown(); + try { + updateThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ThreadInterruptedException(e); + } + updateThread = null; + } + } + + /** + * Executes the update operation immediately, irregardess if an update thread + * is running or not. + */ + public void updateNow() throws IOException { + ensureOpen(); + updateLock.lock(); + try { + doUpdate(); + } finally { + updateLock.unlock(); + } + } + + @Override + public synchronized void close() { + if (!closed) { + stopUpdateThread(); + closed = true; + } + } + + @Override + public String toString() { + String res = "ReplicationClient"; + if (updateThread != null) { + res += " (" + updateThread.getName() + ")"; + } + return res; + } + + /** + * Called when an exception is hit by the replication thread. The default + * implementation prints the full stacktrace to + * {@link InfoStream#getDefault()} (if {@link InfoStream#isEnabled(String)} + * for {@link #INFO_STREAM_COMPONENT}), and you can override to log the + * exception elswhere. + *

+ * NOTE: if you override this method to throw the exception further, + * the replication thread will be terminated. The only way to restart it is to + * call {@link #stopUpdateThread()} followed by + * {@link #startUpdateThread(long, String)}. + */ + protected void handleUpdateException(Throwable t) { + final StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + InfoStream stream = InfoStream.getDefault(); + if (stream.isEnabled(INFO_STREAM_COMPONENT)) { + stream.message(INFO_STREAM_COMPONENT, "an error occurred during revision update: " + sw.toString()); + } + } + + /** + * Returns the files required for replication. By default, this method returns + * all files that exist in the new revision, but not in the handler. + */ + protected Map requiredFiles(Map newRevisionFiles) { + Map handlerRevisionFiles = handler.currentRevisionFiles(); + if (handlerRevisionFiles == null) { + return newRevisionFiles; + } + + Map requiredFiles = new HashMap(); + for (Entry e : handlerRevisionFiles.entrySet()) { + // put the handler files in a Set, for faster contains() checks later + Set handlerFiles = new HashSet(); + for (RevisionFile file : e.getValue()) { + handlerFiles.add(file.fileName); + } + + // make sure to preserve revisionFiles order + ArrayList res = new ArrayList(); + String source = e.getKey(); + assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles; + for (RevisionFile file : newRevisionFiles.get(source)) { + if (!handlerFiles.contains(file.fileName)) { + res.add(file); + } + } + requiredFiles.put(source, res.toArray(new RevisionFile[res.size()])); + } + + return requiredFiles; + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (working copy) @@ -0,0 +1,80 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +/** + * An interface for replicating files. Allows a producer to + * {@link #publish(Revision) publish} {@link Revision}s and consumers to + * {@link #checkForUpdate(String) check for updates}. When a client needs to be + * updated, it is given a {@link SessionToken} through which it can + * {@link #obtainFile(String, String, String) obtain} the files of that + * revision. After the client has finished obtaining all the files, it should + * {@link #release(String) release} the given session, so that the files can be + * reclaimed if they are not needed anymore. + *

+ * A client is always updated to the newest revision available. That is, if a + * client is on revision r1 and revisions r2 and r3 + * were published, then when the cllient will next check for update, it will + * receive r3. + * + * @lucene.experimental + */ +public interface Replicator extends Closeable { + + /** + * Publish a new {@link Revision} for consumption by clients. It is the + * caller's responsibility to verify that the revision files exist and can be + * read by clients. When the revision is no longer needed, it will be + * {@link Revision#release() released} by the replicator. + */ + public void publish(Revision revision) throws IOException; + + /** + * Check whether the given version is up-to-date and returns a + * {@link SessionToken} which can be used for fetching the revision files, + * otherwise returns {@code null}. + *

+ * NOTE: when the returned session token is no longer needed, you + * should call {@link #release(String)} so that the session resources can be + * reclaimed, including the revision files. + */ + public SessionToken checkForUpdate(String currVersion) throws IOException; + + /** + * Notify that the specified {@link SessionToken} is no longer needed by the + * caller. + */ + public void release(String sessionID) throws IOException; + + /** + * Returns an {@link InputStream} for the requested file and source in the + * context of the given {@link SessionToken#id session}. + *

+ * NOTE: it is the caller's responsibility to close the returned + * stream. + * + * @throws SessionExpiredException if the specified session has already + * expired + */ + public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException; + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (working copy) @@ -0,0 +1,74 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import org.apache.lucene.store.IndexInput; + +/** + * A revision comprises lists of files that come from different sources and need + * to be replicated together to e.g. guarantee that all resources are in sync. + * In most cases an application will replicate a single index, and so the + * revision will contain files from a single source. However, some applications + * may require to treat a collection of indexes as a single entity so that the + * files from all sources are replicated together, to guarantee consistency + * beween them. For example, an application which indexes facets will need to + * replicate both the search and taxonomy indexes together, to guarantee that + * they match at the client side. + * + * @lucene.experimental + */ +public interface Revision extends Comparable { + + /** + * Compares the revision to the given version string. Behaves like + * {@link Comparable#compareTo(Object)}. + */ + public int compareTo(String version); + + /** + * Returns a string representation of the version of this revision. The + * version is used by {@link #compareTo(String)} as well as to + * serialize/deserialize revision information. Therefore it must be self + * descriptive as well as be able to identify one revision from another. + */ + public String getVersion(); + + /** + * Returns the files that comprise this revision, as a mapping from a source + * to a list of files. + */ + public Map getSourceFiles(); + + /** + * Returns an {@link IndexInput} for the given fileName and source. It is the + * caller's respnsibility to close the {@link IndexInput} when it has been + * consumed. + */ + public InputStream open(String source, String fileName) throws IOException; + + /** + * Called when this revision can be safely released, i.e. where there are no + * more references to it. + */ + public void release() throws IOException; + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (working copy) @@ -0,0 +1,59 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Describes a file in a {@link Revision}. A file has a source, which allows a + * single revision to contain files from multiple sources (e.g. multiple + * indexes). + * + * @lucene.experimental + */ +public class RevisionFile { + + /** The name of the file. */ + public final String fileName; + + /** The size of the file denoted by {@link #fileName}. */ + public long size = -1; + + /** Constructor with the given file name. */ + public RevisionFile(String fileName) { + if (fileName == null || fileName.isEmpty()) { + throw new IllegalArgumentException("fileName cannot be null or empty"); + } + this.fileName = fileName; + } + + @Override + public boolean equals(Object obj) { + RevisionFile other = (RevisionFile) obj; + return fileName.equals(other.fileName) && size == other.size; + } + + @Override + public int hashCode() { + return fileName.hashCode() ^ (int) (size ^ (size >>> 32)); + } + + @Override + public String toString() { + return "fileName=" + fileName + " size=" + size; + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (working copy) @@ -0,0 +1,54 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** + * Exception indicating that a revision update session was expired due to lack + * of activity. + * + * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD + * @see LocalReplicator#setExpirationThreshold(long) + * + * @lucene.experimental + */ +public class SessionExpiredException extends IOException { + + /** + * @see IOException#IOException(String, Throwable) + */ + public SessionExpiredException(String message, Throwable cause) { + super(message, cause); + } + + /** + * @see IOException#IOException(String) + */ + public SessionExpiredException(String message) { + super(message); + } + + /** + * @see IOException#IOException(Throwable) + */ + public SessionExpiredException(Throwable cause) { + super(cause); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (working copy) @@ -0,0 +1,111 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Token for a replication session, for guaranteeing that source replicated + * files will be kept safe until the replication completes. + * + * @see Replicator#checkForUpdate(String) + * @see Replicator#release(String) + * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD + * + * @lucene.experimental + */ +public final class SessionToken { + + /** + * ID of this session. + * Should be passed when releasing the session, thereby acknowledging the + * {@link Replicator Replicator} that this session is no longer in use. + * @see Replicator#release(String) + */ + public final String id; + + /** + * @see Revision#getVersion() + */ + public final String version; + + /** + * @see Revision#getSourceFiles() + */ + public final Map sourceFiles; + + /** Constructor which deserializes from the given {@link DataInput}. */ + public SessionToken(DataInput in) throws IOException { + this.id = in.readUTF(); + this.version = in.readUTF(); + this.sourceFiles = new HashMap(); + int numSources = in.readInt(); + while (numSources > 0) { + String source = in.readUTF(); + RevisionFile[] files = new RevisionFile[in.readInt()]; + for (int i = 0; i < files.length; i++) { + String fileName = in.readUTF(); + files[i] = new RevisionFile(fileName); + files[i].size = in.readLong(); + } + this.sourceFiles.put(source, files); + --numSources; + } + } + + /** Constructor with the given id and revision. */ + public SessionToken(String id, Revision revision) { + this.id = id; + this.version = revision.getVersion(); + this.sourceFiles = revision.getSourceFiles(); + } + + /** Serialize the token data for communication between server and client. */ + public void serialize(DataOutput out) throws IOException { + out.writeUTF(id); + out.writeUTF(version); + out.writeInt(sourceFiles.size()); + for (Entry e : sourceFiles.entrySet()) { + out.writeUTF(e.getKey()); + RevisionFile[] files = e.getValue(); + out.writeInt(files.length); + for (RevisionFile file : files) { + out.writeUTF(file.fileName); + out.writeLong(file.size); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("session=").append(id).append(" version=").append(version).append(" sourceFiles={"); + for (Entry e : sourceFiles.entrySet()) { + sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';'); + } + sb.append('}'); + return sb.toString(); + } + +} \ No newline at end of file Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (working copy) @@ -0,0 +1,297 @@ +package org.apache.lucene.replicator.http; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.concurrent.Callable; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.util.EntityUtils; +import org.apache.lucene.store.AlreadyClosedException; + +/** + * Base class for Http clients. + * + * @lucene.experimental + * */ +public abstract class HttpClientBase implements Closeable { + + /** + * Default connection timeout for this client, in milliseconds. + * + * @see #setConnectionTimeout(int) + */ + public static final int DEFAULT_CONNECTION_TIMEOUT = 1000; + + /** + * Default socket timeout for this client, in milliseconds. + * + * @see #setSoTimeout(int) + */ + public static final int DEFAULT_SO_TIMEOUT = 60000; + + // TODO compression? + + /** The URL stting to execute requests against. */ + protected final String url; + + private volatile boolean closed = false; + + private final HttpClient httpc; + + /** + * @param conMgr connection manager to use for this http client. + * NOTE:The provided {@link ClientConnectionManager} will not be + * {@link ClientConnectionManager#shutdown()} by this class. + */ + protected HttpClientBase(String host, int port, String path, ClientConnectionManager conMgr) { + url = normalizedURL(host, port, path); + httpc = new DefaultHttpClient(conMgr); + setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT); + setSoTimeout(DEFAULT_SO_TIMEOUT); + } + + /** + * Set the connection timeout for this client, in milliseconds. This setting + * is used to modify {@link HttpConnectionParams#setConnectionTimeout}. + * + * @param timeout timeout to set, in millisecopnds + */ + public void setConnectionTimeout(int timeout) { + HttpConnectionParams.setConnectionTimeout(httpc.getParams(), timeout); + } + + /** + * Set the socket timeout for this client, in milliseconds. This setting + * is used to modify {@link HttpConnectionParams#setSoTimeout}. + * + * @param timeout timeout to set, in millisecopnds + */ + public void setSoTimeout(int timeout) { + HttpConnectionParams.setSoTimeout(httpc.getParams(), timeout); + } + + /** Throws {@link AlreadyClosedException} if this client is already closed. */ + protected final void ensureOpen() throws AlreadyClosedException { + if (closed) { + throw new AlreadyClosedException("HttpClient already closed"); + } + } + + /** + * Create a URL out of the given parameters, translate an empty/null path to '/' + */ + private static String normalizedURL(String host, int port, String path) { + if (path == null || path.length() == 0) { + path = "/"; + } + return "http://" + host + ":" + port + path; + } + + /** + * Internal: response status after invocation, and in case or error attempt to read the + * exception sent by the server. + */ + protected void verifyStatus(HttpResponse response) throws IOException { + StatusLine statusLine = response.getStatusLine(); + if (statusLine.getStatusCode() != HttpStatus.SC_OK) { + throwKnownError(response, statusLine); + } + } + + protected void throwKnownError(HttpResponse response, StatusLine statusLine) throws IOException { + ObjectInputStream in = null; + try { + in = new ObjectInputStream(response.getEntity().getContent()); + } catch (Exception e) { + // the response stream is not an exception - could be an error in servlet.init(). + throw new RuntimeException("Uknown error: " + statusLine); + } + + Throwable t; + try { + t = (Throwable) in.readObject(); + } catch (Exception e) { + //not likely + throw new RuntimeException("Failed to read exception object: " + statusLine, e); + } finally { + in.close(); + } + if (t instanceof IOException) { + throw (IOException) t; + } + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + throw new RuntimeException("unknown exception "+statusLine,t); + } + + /** + * internal: execute a request and return its result + * The params argument is treated as: name1,value1,name2,value2,... + */ + protected HttpResponse executePOST(String request, HttpEntity entity, String... params) throws IOException { + ensureOpen(); + HttpPost m = new HttpPost(queryString(request, params)); + m.setEntity(entity); + HttpResponse response = httpc.execute(m); + verifyStatus(response); + return response; + } + + /** + * internal: execute a request and return its result + * The params argument is treated as: name1,value1,name2,value2,... + */ + protected HttpResponse executeGET(String request, String... params) throws IOException { + ensureOpen(); + HttpGet m = new HttpGet(queryString(request, params)); + HttpResponse response = httpc.execute(m); + verifyStatus(response); + return response; + } + + private String queryString(String request, String... params) throws UnsupportedEncodingException { + StringBuilder query = new StringBuilder(url).append('/').append(request).append('?'); + if (params != null) { + for (int i = 0; i < params.length; i += 2) { + query.append(params[i]).append('=').append(URLEncoder.encode(params[i+1], "UTF8")).append('&'); + } + } + return query.substring(0, query.length() - 1); + } + + /** Internal utility: input stream of the provided response */ + public InputStream responseInputStream(HttpResponse response) throws IOException { + return responseInputStream(response, false); + } + + // TODO: can we simplify this Consuming !?!?!? + /** + * Internal utility: input stream of the provided response, which optionally + * consumes the response's resources when the input stream is exhausted. + */ + public InputStream responseInputStream(HttpResponse response, boolean consume) throws IOException { + final HttpEntity entity = response.getEntity(); + final InputStream in = entity.getContent(); + if (!consume) { + return in; + } + return new InputStream() { + private boolean consumed = false; + @Override + public int read() throws IOException { + final int res = in.read(); + consume(res); + return res; + } + @Override + public void close() throws IOException { + super.close(); + consume(-1); + } + @Override + public int read(byte[] b) throws IOException { + final int res = super.read(b); + consume(res); + return res; + } + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int res = super.read(b, off, len); + consume(res); + return res; + } + private void consume(int minusOne) { + if (!consumed && minusOne==-1) { + try { + EntityUtils.consume(entity); + } catch (Exception e) { + // ignored on purpose + } + consumed = true; + } + } + }; + } + + /** + * Returns true iff this instance was {@link #close() closed}, otherwise + * returns false. Note that if you override {@link #close()}, you must call + * {@code super.close()}, in order for this instance to be properly closed. + */ + protected final boolean isClosed() { + return closed; + } + + /** + * Same as {@link #doAction(HttpResponse, boolean, Callable)} but always do consume at the end. + */ + protected T doAction(HttpResponse response, Callable call) throws IOException { + return doAction(response, true, call); + } + + /** + * Do a specific action and validate after the action that the status is still OK, + * and if not, attempt to extract the actual server side exception. Optionally + * release the response at exit, depending on consume parameter. + */ + protected T doAction(HttpResponse response, boolean consume, Callable call) throws IOException { + IOException error = null; + try { + return call.call(); + } catch (IOException e) { + error = e; + } catch (Exception e) { + error = new IOException(e); + } finally { + try { + verifyStatus(response); + } finally { + if (consume) { + try { + EntityUtils.consume(response.getEntity()); + } catch (Exception e) { + // ignoring on purpose + } + } + } + } + throw error; // should not get here + } + + @Override + public void close() throws IOException { + closed = true; + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (working copy) @@ -0,0 +1,105 @@ +package org.apache.lucene.replicator.http; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Callable; + +import org.apache.http.HttpResponse; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.lucene.replicator.Replicator; +import org.apache.lucene.replicator.Revision; +import org.apache.lucene.replicator.SessionToken; +import org.apache.lucene.replicator.http.ReplicationService.ReplicationAction; + +/** + * An HTTP implementation of {@link Replicator}. Assumes the API supported by + * {@link ReplicationService}. + * + * @lucene.experimental + */ +public class HttpReplicator extends HttpClientBase implements Replicator { + + /** Construct with specified connection manager. */ + public HttpReplicator(String host, int port, String path, ClientConnectionManager conMgr) { + super(host, port, path, conMgr); + } + + @Override + public SessionToken checkForUpdate(String currVersion) throws IOException { + String[] params = null; + if (currVersion != null) { + params = new String[] { ReplicationService.REPLICATE_VERSION_PARAM, currVersion }; + } + final HttpResponse response = executeGET(ReplicationAction.UPDATE.name(), params); + return doAction(response, new Callable() { + @Override + public SessionToken call() throws Exception { + final DataInputStream dis = new DataInputStream(responseInputStream(response)); + try { + if (dis.readByte() == 0) { + return null; + } else { + return new SessionToken(dis); + } + } finally { + dis.close(); + } + } + }); + } + + @Override + public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException { + String[] params = new String[] { + ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID, + ReplicationService.REPLICATE_SOURCE_PARAM, source, + ReplicationService.REPLICATE_FILENAME_PARAM, fileName, + }; + final HttpResponse response = executeGET(ReplicationAction.OBTAIN.name(), params); + return doAction(response, false, new Callable() { + @Override + public InputStream call() throws Exception { + return responseInputStream(response,true); + } + }); + } + + @Override + public void publish(Revision revision) throws IOException { + throw new UnsupportedOperationException( + "this replicator implementation does not support remote publishing of revisions"); + } + + @Override + public void release(String sessionID) throws IOException { + String[] params = new String[] { + ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID + }; + final HttpResponse response = executeGET(ReplicationAction.RELEASE.name(), params); + doAction(response, new Callable() { + @Override + public Object call() throws Exception { + return null; // do not remove this call: as it is still validating for us! + } + }); + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java (working copy) @@ -0,0 +1,198 @@ +package org.apache.lucene.replicator.http; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Locale; +import java.util.Map; +import java.util.StringTokenizer; + +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.http.HttpStatus; +import org.apache.lucene.replicator.Replicator; +import org.apache.lucene.replicator.SessionToken; + +/** + * A server-side service for handling replication requests. The service assumes + * requests are sent in the format + * /<context>/<shard>/<action> where + *
    + *
  • {@code context} is the servlet context, e.g. {@link #REPLICATION_CONTEXT} + *
  • {@code shard} is the ID of the shard, e.g. "s1" + *
  • {@code action} is one of {@link ReplicationAction} values + *
+ * For example, to check whether there are revision updates for shard "s1" you + * should send the request: http://host:port/replicate/s1/update. + *

+ * This service is written like a servlet, and + * {@link #perform(HttpServletRequest, HttpServletResponse)} takes servlet + * request and response accordingly, so it is quite easy to embed in your + * application's servlet. + * + * @lucene.experimental + */ +public class ReplicationService { + + /** Actions supported by the {@link ReplicationService}. */ + public enum ReplicationAction { + OBTAIN, RELEASE, UPDATE + } + + /** The context path for the servlet. */ + public static final String REPLICATION_CONTEXT = "/replicate"; + + /** Request parameter name for providing the revision version. */ + public final static String REPLICATE_VERSION_PARAM = "version"; + + /** Request parameter name for providing a session ID. */ + public final static String REPLICATE_SESSION_ID_PARAM = "sessionid"; + + /** Request parameter name for providing the file's source. */ + public final static String REPLICATE_SOURCE_PARAM = "source"; + + /** Request parameter name for providing the file's name. */ + public final static String REPLICATE_FILENAME_PARAM = "filename"; + + private static final int SHARD_IDX = 0, ACTION_IDX = 1; + + private final Map replicators; + + public ReplicationService(Map replicators) { + super(); + this.replicators = replicators; + } + + /** + * Returns the path elements that were given in the servlet request, excluding + * the servlet's action context. + */ + private String[] getPathElements(HttpServletRequest req) { + String path = req.getServletPath(); + String pathInfo = req.getPathInfo(); + if (pathInfo != null) { + path += pathInfo; + } + int actionLen = REPLICATION_CONTEXT.length(); + int startIdx = actionLen; + if (path.length() > actionLen && path.charAt(actionLen) == '/') { + ++startIdx; + } + + // split the string on '/' and remove any empty elements. This is better + // than using String.split() since the latter may return empty elements in + // the array + StringTokenizer stok = new StringTokenizer(path.substring(startIdx), "/"); + ArrayList elements = new ArrayList(); + while (stok.hasMoreTokens()) { + elements.add(stok.nextToken()); + } + return elements.toArray(new String[0]); + } + + private static String extractRequestParam(HttpServletRequest req, String paramName) throws ServletException { + String param = req.getParameter(paramName); + if (param == null) { + throw new ServletException("Missing mandatory parameter: " + paramName); + } + return param; + } + + private static void copy(InputStream in, OutputStream out) throws IOException { + byte[] buf = new byte[16384]; + int numRead; + while ((numRead = in.read(buf)) != -1) { + out.write(buf, 0, numRead); + } + } + + /** Executes the replication task. */ + public void perform(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + String[] pathElements = getPathElements(req); + + if (pathElements.length != 2) { + throw new ServletException("invalid path, must contain shard ID and action, e.g. */s1/update"); + } + + final ReplicationAction action; + try { + action = ReplicationAction.valueOf(pathElements[ACTION_IDX].toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new ServletException("Unsupported action provided: " + pathElements[ACTION_IDX]); + } + + final Replicator replicator = replicators.get(pathElements[SHARD_IDX]); + if (replicator == null) { + throw new ServletException("unrecognized shard ID " + pathElements[SHARD_IDX]); + } + + ServletOutputStream resOut = resp.getOutputStream(); + try { + switch (action) { + case OBTAIN: + final String sessionID = extractRequestParam(req, REPLICATE_SESSION_ID_PARAM); + final String fileName = extractRequestParam(req, REPLICATE_FILENAME_PARAM); + final String source = extractRequestParam(req, REPLICATE_SOURCE_PARAM); + InputStream in = replicator.obtainFile(sessionID, source, fileName); + try { + copy(in, resOut); + } finally { + in.close(); + } + break; + case RELEASE: + replicator.release(extractRequestParam(req, REPLICATE_SESSION_ID_PARAM)); + break; + case UPDATE: + String currVersion = req.getParameter(REPLICATE_VERSION_PARAM); + SessionToken token = replicator.checkForUpdate(currVersion); + if (token == null) { + resOut.write(0); // marker for null token + } else { + resOut.write(1); // marker for null token + token.serialize(new DataOutputStream(resOut)); + } + break; + } + } catch (Exception e) { + resp.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR); // propagate the failure + try { + /* + * Note: it is assumed that "identified exceptions" are thrown before + * anything was written to the stream. + */ + ObjectOutputStream oos = new ObjectOutputStream(resOut); + oos.writeObject(e); + oos.flush(); + } catch (Exception e2) { + throw new IOException("Could not serialize", e2); + } + } finally { + resp.flushBuffer(); + } + } + +} Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html (working copy) @@ -0,0 +1,28 @@ + + + + + +HTTP replication implementation + + + +

HTTP replication implementation

+ + + \ No newline at end of file Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/org/apache/lucene/replicator/package.html =================================================================== --- lucene/replicator/src/java/org/apache/lucene/replicator/package.html (revision 0) +++ lucene/replicator/src/java/org/apache/lucene/replicator/package.html (working copy) @@ -0,0 +1,81 @@ + + + + + +Files replication framework + + + +

Files replication framework

+ + The + Replicator allows replicating files between a server and client(s). Producers publish + revisions and consumers update to the latest revision available. + ReplicationClient is a helper utility for performing the update operation. It can + be invoked either + manually or periodically by + starting an update thread. + HttpReplicator can be used to replicate revisions by consumers that reside on + a different node than the producer. + +

+ The replication framework supports replicating indexes as well as an index and taxonomy pairs. The application + publishes an + IndexRevision (for a single index), or an + IndexAndTaxonomyRevision (for an index and taxonomy pair). It also needs + to set the corresponding + ReplicationHandler on the replication client + (IndexReplicationHandler and + IndexAndTaxonomyReplicationHandler respectively). + +

+ When the replication client detects that there is a newer revision available, it copies the files of this revision from + the server to the client and then invokes the handler to complete the operation (e.g. copy the files to the index + directory, fsync them, reopen an index reader etc.). By default, only files that do not exist in the handler's + current revision files are copied, + however this can be overridden by extending the client. + +

+ An example usage of the Replicator: + +

+// ++++++++++++++ SERVER SIDE ++++++++++++++ // 
+IndexWriter publishWriter; // the writer used for indexing
+Replicator replicator = new LocalReplicator();
+replicator.publish(new IndexRevision(publishWriter));
+
+// ++++++++++++++ CLIENT SIDE ++++++++++++++ // 
+// either LocalReplictor, or HttpReplicator if client and server are on different nodes
+Replicator replicator;
+
+// callback invoked after handler finished handling the revision and e.g. can reopen the reader.
+Callable<Boolean> callback = null; // can also be null if no callback is needed
+ReplicationHandler handler = new IndexReplicationHandler(indexDir, callback);
+SourceDirectoryFactory factory = new PerSessionDirectoryFactory(workDir);
+ReplicationClient client = new ReplicationClient(replicator, handler, factory);
+
+// invoke client manually
+client.updateNow();
+
+// or, periodically
+client.startUpdateThread(100); // check for update every 100 milliseconds
+
+ + + \ No newline at end of file Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/package.html ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/java/overview.html =================================================================== --- lucene/replicator/src/java/overview.html (revision 0) +++ lucene/replicator/src/java/overview.html (working copy) @@ -0,0 +1,26 @@ + + + + + replicator + + + + Provides index files replication capabilities. + + Property changes on: lucene/replicator/src/java/overview.html ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java (working copy) @@ -0,0 +1,267 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.Callable; + +import org.apache.lucene.document.Document; +import org.apache.lucene.facet.index.FacetFields; +import org.apache.lucene.facet.taxonomy.CategoryPath; +import org.apache.lucene.facet.taxonomy.TaxonomyReader; +import org.apache.lucene.facet.taxonomy.TaxonomyWriter; +import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader; +import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter; +import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler; +import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util._TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase { + + private static class IndexAndTaxonomyReadyCallback implements Callable, Closeable { + + private final Directory indexDir, taxoDir; + private DirectoryReader indexReader; + private DirectoryTaxonomyReader taxoReader; + private long lastIndexGeneration = -1; + private int lastTaxoSize = 0; + + public IndexAndTaxonomyReadyCallback(Directory indexDir, Directory taxoDir) throws IOException { + this.indexDir = indexDir; + this.taxoDir = taxoDir; + if (DirectoryReader.indexExists(indexDir)) { + indexReader = DirectoryReader.open(indexDir); + lastIndexGeneration = indexReader.getIndexCommit().getGeneration(); + taxoReader = new DirectoryTaxonomyReader(taxoDir); + lastTaxoSize = taxoReader.getSize(); + } + } + + @Override + public Boolean call() throws Exception { + if (indexReader == null) { + indexReader = DirectoryReader.open(indexDir); + lastIndexGeneration = indexReader.getIndexCommit().getGeneration(); + taxoReader = new DirectoryTaxonomyReader(taxoDir); + lastTaxoSize = taxoReader.getSize(); + } else { + // verify search index + DirectoryReader newReader = DirectoryReader.openIfChanged(indexReader); + assertNotNull("should not have reached here if no changes were made to the index", newReader); + long newGeneration = newReader.getIndexCommit().getGeneration(); + assertTrue("expected newer generation; current=" + lastIndexGeneration + " new=" + newGeneration, newGeneration > lastIndexGeneration); + indexReader.close(); + indexReader = newReader; + lastIndexGeneration = newGeneration; + _TestUtil.checkIndex(indexDir); + + // verify taxonomy index + DirectoryTaxonomyReader newTaxoReader = TaxonomyReader.openIfChanged(taxoReader); + if (newTaxoReader != null) { + int newTaxoSize = newTaxoReader.getSize(); + assertTrue("expected at least the same number of categories; current=" + lastTaxoSize + " new=" + newTaxoSize, newTaxoSize >= lastTaxoSize); + lastTaxoSize = newTaxoSize; + _TestUtil.checkIndex(taxoDir); + } + } + return null; + } + + @Override + public void close() throws IOException { + IOUtils.close(indexReader, taxoReader); + } + } + + /** Fail after waiting that long for client to update to expected version. */ + private static final int CLIENT_WAIT_THRESHOLD = 20000; + + private Directory publishIndexDir, publishTaxoDir, handlerIndexDir, handlerTaxoDir; + private Replicator replicator; + private SourceDirectoryFactory sourceDirFactory; + private ReplicationClient client; + private ReplicationHandler handler; + private IndexWriter publishIndexWriter; + private SnapshotDirectoryTaxonomyWriter publishTaxoWriter; + private IndexAndTaxonomyReadyCallback callback; + private File clientWorkDir; + + private static final String VERSION_ID = "version"; + + private void assertHandlerRevision(int expectedID) throws IOException { + final long failTime = System.currentTimeMillis() + CLIENT_WAIT_THRESHOLD; + while (failTime >= System.currentTimeMillis()) { + try { + DirectoryReader reader = DirectoryReader.open(handlerIndexDir); + try { + int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16); + if (expectedID == handlerID) { + return; + } + } finally { + reader.close(); + } + Thread.sleep(100); // give client a chance to update + } catch (Exception e) { + // we can hit IndexNotFoundException or e.g. EOFException (on + // segments_N) because it is being copied at the same time it is read by + // DirectoryReader.open(). + } + } + fail("waited " + CLIENT_WAIT_THRESHOLD + " millis but expected revision was not obtained"); + } + + private Revision createRevision(final int id) throws IOException { + publishIndexWriter.addDocument(newDocument(publishTaxoWriter)); + publishIndexWriter.setCommitData(new HashMap() {{ + put(VERSION_ID, Integer.toString(id, 16)); + }}); + publishIndexWriter.commit(); + publishTaxoWriter.commit(); + return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter); + } + + private Document newDocument(TaxonomyWriter taxoWriter) throws IOException { + Document doc = new Document(); + FacetFields facetFields = new FacetFields(taxoWriter); + facetFields.addFields(doc, Collections.singleton(new CategoryPath("A"))); + return doc; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + publishIndexDir = newDirectory(); + publishTaxoDir = newDirectory(); + handlerIndexDir = newDirectory(); + handlerTaxoDir = newDirectory(); + clientWorkDir = _TestUtil.getTempDir("replicationClientTest"); + sourceDirFactory = new PerSessionDirectoryFactory(clientWorkDir); + replicator = new LocalReplicator(); + callback = new IndexAndTaxonomyReadyCallback(handlerIndexDir, handlerTaxoDir); + handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, callback); + client = new ReplicationClient(replicator, handler, sourceDirFactory); + + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + publishIndexWriter = new IndexWriter(publishIndexDir, conf); + publishTaxoWriter = new SnapshotDirectoryTaxonomyWriter(publishTaxoDir); + } + + @After + @Override + public void tearDown() throws Exception { + IOUtils.close(client, callback, publishIndexWriter, publishTaxoWriter, replicator, publishIndexDir, publishTaxoDir, + handlerIndexDir, handlerTaxoDir); + super.tearDown(); + } + + @Test + public void testNoUpdateThread() throws Exception { + assertNull("no version expected at start", handler.currentVersion()); + + // Callback validates the replicated index + replicator.publish(createRevision(1)); + client.updateNow(); + + // make sure updating twice, when in fact there's nothing to update, works + client.updateNow(); + + replicator.publish(createRevision(2)); + client.updateNow(); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + client.updateNow(); + } + + @Test + public void testRestart() throws Exception { + replicator.publish(createRevision(1)); + client.updateNow(); + + replicator.publish(createRevision(2)); + client.updateNow(); + + client.stopUpdateThread(); + client.close(); + client = new ReplicationClient(replicator, handler, sourceDirFactory); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + client.updateNow(); + } + + @Test + public void testUpdateThread() throws Exception { + client.startUpdateThread(10, "indexTaxo"); + + replicator.publish(createRevision(1)); + assertHandlerRevision(1); + + replicator.publish(createRevision(2)); + assertHandlerRevision(2); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + assertHandlerRevision(4); + } + + @Test + public void testRecreateTaxonomy() throws Exception { + replicator.publish(createRevision(1)); + client.updateNow(); + + // recreate index and taxonomy + Directory newTaxo = newDirectory(); + new DirectoryTaxonomyWriter(newTaxo).close(); + publishIndexWriter.deleteAll(); + publishIndexWriter.commit(); + publishTaxoWriter.replaceTaxonomy(newTaxo); + publishTaxoWriter.commit(); + + replicator.publish(new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter)); + try { + client.updateNow(); + fail("should have hit IllegalStateException updating a recreated taxonomy"); + } catch (IllegalStateException e) { + // ok + } finally { + newTaxo.close(); + } + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyRevisionTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyRevisionTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyRevisionTest.java (working copy) @@ -0,0 +1,170 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.lucene.document.Document; +import org.apache.lucene.facet.index.FacetFields; +import org.apache.lucene.facet.taxonomy.CategoryPath; +import org.apache.lucene.facet.taxonomy.TaxonomyWriter; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.IOUtils; +import org.junit.Test; + +public class IndexAndTaxonomyRevisionTest extends ReplicatorTestCase { + + private Document newDocument(TaxonomyWriter taxoWriter) throws IOException { + Document doc = new Document(); + FacetFields ff = new FacetFields(taxoWriter); + ff.addFields(doc, Collections.singleton(new CategoryPath("A"))); + return doc; + } + + @Test + public void testNoCommit() throws Exception { + Directory indexDir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter indexWriter = new IndexWriter(indexDir, conf); + + Directory taxoDir = newDirectory(); + SnapshotDirectoryTaxonomyWriter taxoWriter = new SnapshotDirectoryTaxonomyWriter(taxoDir); + try { + assertNotNull(new IndexAndTaxonomyRevision(indexWriter, taxoWriter)); + fail("should have failed when there are no commits to snapshot"); + } catch (IllegalStateException e) { + // expected + } finally { + IOUtils.close(indexWriter, taxoWriter, taxoDir, indexDir); + } + } + + @Test + public void testRevisionRelease() throws Exception { + Directory indexDir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter indexWriter = new IndexWriter(indexDir, conf); + + Directory taxoDir = newDirectory(); + SnapshotDirectoryTaxonomyWriter taxoWriter = new SnapshotDirectoryTaxonomyWriter(taxoDir); + try { + indexWriter.addDocument(newDocument(taxoWriter)); + indexWriter.commit(); + taxoWriter.commit(); + Revision rev1 = new IndexAndTaxonomyRevision(indexWriter, taxoWriter); + // releasing that revision should not delete the files + rev1.release(); + assertTrue(indexDir.fileExists(IndexFileNames.SEGMENTS + "_1")); + assertTrue(taxoDir.fileExists(IndexFileNames.SEGMENTS + "_1")); + + rev1 = new IndexAndTaxonomyRevision(indexWriter, taxoWriter); // create revision again, so the files are snapshotted + indexWriter.addDocument(newDocument(taxoWriter)); + indexWriter.commit(); + taxoWriter.commit(); + assertNotNull(new IndexAndTaxonomyRevision(indexWriter, taxoWriter)); + rev1.release(); // this release should trigger the delete of segments_1 + assertFalse(indexDir.fileExists(IndexFileNames.SEGMENTS + "_1")); + } finally { + IOUtils.close(indexWriter, taxoWriter, taxoDir, indexDir); + } + } + + @Test + public void testSegmentsFileLast() throws Exception { + Directory indexDir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter indexWriter = new IndexWriter(indexDir, conf); + + Directory taxoDir = newDirectory(); + SnapshotDirectoryTaxonomyWriter taxoWriter = new SnapshotDirectoryTaxonomyWriter(taxoDir); + try { + indexWriter.addDocument(newDocument(taxoWriter)); + indexWriter.commit(); + taxoWriter.commit(); + Revision rev = new IndexAndTaxonomyRevision(indexWriter, taxoWriter); + Map sourceFiles = rev.getSourceFiles(); + assertEquals(2, sourceFiles.size()); + for (RevisionFile[] files : sourceFiles.values()) { + String lastFile = files[files.length - 1].fileName; + assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS) && !lastFile.equals(IndexFileNames.SEGMENTS_GEN)); + } + } finally { + IOUtils.close(indexWriter, taxoWriter, taxoDir, indexDir); + } + } + + @Test + public void testOpen() throws Exception { + Directory indexDir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter indexWriter = new IndexWriter(indexDir, conf); + + Directory taxoDir = newDirectory(); + SnapshotDirectoryTaxonomyWriter taxoWriter = new SnapshotDirectoryTaxonomyWriter(taxoDir); + try { + indexWriter.addDocument(newDocument(taxoWriter)); + indexWriter.commit(); + taxoWriter.commit(); + Revision rev = new IndexAndTaxonomyRevision(indexWriter, taxoWriter); + for (Entry e : rev.getSourceFiles().entrySet()) { + String source = e.getKey(); + Directory dir = source.equals(IndexAndTaxonomyRevision.INDEX_SOURCE) ? indexDir : taxoDir; + RevisionFile[] files = e.getValue(); + for (RevisionFile file : files) { + IndexInput src = dir.openInput(file.fileName, IOContext.READONCE); + InputStream in = rev.open(source, file.fileName); + assertEquals(src.length(), in.available()); + byte[] srcBytes = new byte[(int) src.length()]; + byte[] inBytes = new byte[(int) src.length()]; + int offset = 0; + if (random().nextBoolean()) { + int skip = random().nextInt(10); + if (skip >= src.length()) { + skip = 0; + } + in.skip(skip); + src.seek(skip); + offset = skip; + } + src.readBytes(srcBytes, offset, srcBytes.length - offset); + in.read(inBytes, offset, inBytes.length - offset); + assertArrayEquals(srcBytes, inBytes); + IOUtils.close(src, in); + } + } + } finally { + IOUtils.close(indexWriter, taxoWriter, taxoDir, indexDir); + } + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyRevisionTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/IndexReplicationClientTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/IndexReplicationClientTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/IndexReplicationClientTest.java (working copy) @@ -0,0 +1,199 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.Callable; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler; +import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util._TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class IndexReplicationClientTest extends ReplicatorTestCase { + + /** Fail after waiting that long for client to update to expected version. */ + private static final int CLIENT_WAIT_THRESHOLD = 20000; + + private static class IndexReadyCallback implements Callable, Closeable { + + private final Directory indexDir; + private DirectoryReader reader; + private long lastGeneration = -1; + + public IndexReadyCallback(Directory indexDir) throws IOException { + this.indexDir = indexDir; + if (DirectoryReader.indexExists(indexDir)) { + reader = DirectoryReader.open(indexDir); + lastGeneration = reader.getIndexCommit().getGeneration(); + } + } + + @Override + public Boolean call() throws Exception { + if (reader == null) { + reader = DirectoryReader.open(indexDir); + lastGeneration = reader.getIndexCommit().getGeneration(); + } else { + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull("should not have reached here if no changes were made to the index", newReader); + long newGeneration = newReader.getIndexCommit().getGeneration(); + assertTrue("expected newer generation; current=" + lastGeneration + " new=" + newGeneration, newGeneration > lastGeneration); + reader.close(); + reader = newReader; + lastGeneration = newGeneration; + _TestUtil.checkIndex(indexDir); + } + return null; + } + + @Override + public void close() throws IOException { + IOUtils.close(reader); + } + } + + private Directory publishDir, handlerDir; + private Replicator replicator; + private SourceDirectoryFactory sourceDirFactory; + private ReplicationClient client; + private ReplicationHandler handler; + private IndexWriter publishWriter; + private IndexReadyCallback callback; + + private static final String VERSION_ID = "version"; + + private void assertHandlerRevision(int expectedID) throws IOException { + final long failTime = System.currentTimeMillis() + CLIENT_WAIT_THRESHOLD; + while (failTime >= System.currentTimeMillis()) { + try { + DirectoryReader reader = DirectoryReader.open(handlerDir); + try { + int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16); + if (expectedID == handlerID) { + return; + } + } finally { + reader.close(); + } + Thread.sleep(100); // give client a chance to update + } catch (Exception e) { + // we can hit IndexNotFoundException or e.g. EOFException (on + // segments_N) because it is being copied at the same time it is read by + // DirectoryReader.open(). + } + } + fail("waited " + CLIENT_WAIT_THRESHOLD + " millis but expected revision was not obtained"); + } + + private Revision createRevision(final int id) throws IOException { + publishWriter.addDocument(new Document()); + publishWriter.setCommitData(new HashMap() {{ + put(VERSION_ID, Integer.toString(id, 16)); + }}); + publishWriter.commit(); + return new IndexRevision(publishWriter); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + publishDir = newDirectory(); + handlerDir = newDirectory(); + sourceDirFactory = new PerSessionDirectoryFactory(_TestUtil.getTempDir("replicationClientTest")); + replicator = new LocalReplicator(); + callback = new IndexReadyCallback(handlerDir); + handler = new IndexReplicationHandler(handlerDir, callback); + client = new ReplicationClient(replicator, handler, sourceDirFactory); + + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + publishWriter = new IndexWriter(publishDir, conf); + } + + @After + @Override + public void tearDown() throws Exception { + IOUtils.close(client, callback, publishWriter, replicator, publishDir, handlerDir); + super.tearDown(); + } + + @Test + public void testNoUpdateThread() throws Exception { + assertNull("no version expected at start", handler.currentVersion()); + + // Callback validates the replicated index + replicator.publish(createRevision(1)); + client.updateNow(); + + replicator.publish(createRevision(2)); + client.updateNow(); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + client.updateNow(); + } + + @Test + public void testUpdateThread() throws Exception { + client.startUpdateThread(10, "index"); + + replicator.publish(createRevision(1)); + assertHandlerRevision(1); + + replicator.publish(createRevision(2)); + assertHandlerRevision(2); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + assertHandlerRevision(4); + } + + @Test + public void testRestart() throws Exception { + replicator.publish(createRevision(1)); + client.updateNow(); + + replicator.publish(createRevision(2)); + client.updateNow(); + + client.stopUpdateThread(); + client.close(); + client = new ReplicationClient(replicator, handler, sourceDirFactory); + + // Publish two revisions without update, handler should be upgraded to latest + replicator.publish(createRevision(3)); + replicator.publish(createRevision(4)); + client.updateNow(); + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/IndexReplicationClientTest.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/IndexRevisionTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/IndexRevisionTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/IndexRevisionTest.java (working copy) @@ -0,0 +1,155 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.InputStream; +import java.util.Map; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.IOUtils; +import org.junit.Test; + +public class IndexRevisionTest extends ReplicatorTestCase { + + @Test + public void testNoSnapshotDeletionPolicy() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + IndexWriter writer = new IndexWriter(dir, conf); + try { + assertNotNull(new IndexRevision(writer)); + fail("should have failed when IndexDeletionPolicy is not Snapshot"); + } catch (IllegalArgumentException e) { + // expected + } finally { + IOUtils.close(writer, dir); + } + } + + @Test + public void testNoCommit() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter writer = new IndexWriter(dir, conf); + try { + assertNotNull(new IndexRevision(writer)); + fail("should have failed when there are no commits to snapshot"); + } catch (IllegalStateException e) { + // expected + } finally { + IOUtils.close(writer, dir); + } + } + + @Test + public void testRevisionRelease() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter writer = new IndexWriter(dir, conf); + try { + writer.addDocument(new Document()); + writer.commit(); + Revision rev1 = new IndexRevision(writer); + // releasing that revision should not delete the files + rev1.release(); + assertTrue(dir.fileExists(IndexFileNames.SEGMENTS + "_1")); + + rev1 = new IndexRevision(writer); // create revision again, so the files are snapshotted + writer.addDocument(new Document()); + writer.commit(); + assertNotNull(new IndexRevision(writer)); + rev1.release(); // this release should trigger the delete of segments_1 + assertFalse(dir.fileExists(IndexFileNames.SEGMENTS + "_1")); + } finally { + IOUtils.close(writer, dir); + } + } + + @Test + public void testSegmentsFileLast() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter writer = new IndexWriter(dir, conf); + try { + writer.addDocument(new Document()); + writer.commit(); + Revision rev = new IndexRevision(writer); + @SuppressWarnings("unchecked") + Map sourceFiles = rev.getSourceFiles(); + assertEquals(1, sourceFiles.size()); + RevisionFile[] files = sourceFiles.values().iterator().next(); + String lastFile = files[files.length - 1].fileName; + assertTrue(lastFile.startsWith(IndexFileNames.SEGMENTS) && !lastFile.equals(IndexFileNames.SEGMENTS_GEN)); + } finally { + IOUtils.close(writer, dir); + } + } + + @Test + public void testOpen() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter writer = new IndexWriter(dir, conf); + try { + writer.addDocument(new Document()); + writer.commit(); + Revision rev = new IndexRevision(writer); + @SuppressWarnings("unchecked") + Map sourceFiles = rev.getSourceFiles(); + String source = sourceFiles.keySet().iterator().next(); + RevisionFile[] files = sourceFiles.values().iterator().next(); + for (RevisionFile file : files) { + IndexInput src = dir.openInput(file.fileName, IOContext.READONCE); + InputStream in = rev.open(source, file.fileName); + assertEquals(src.length(), in.available()); + byte[] srcBytes = new byte[(int) src.length()]; + byte[] inBytes = new byte[(int) src.length()]; + int offset = 0; + if (random().nextBoolean()) { + int skip = random().nextInt(10); + if (skip >= src.length()) { + skip = 0; + } + in.skip(skip); + src.seek(skip); + offset = skip; + } + src.readBytes(srcBytes, offset, srcBytes.length - offset); + in.read(inBytes, offset, inBytes.length - offset); + assertArrayEquals(srcBytes, inBytes); + IOUtils.close(src, in); + } + } finally { + IOUtils.close(writer, dir); + } + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/IndexRevisionTest.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/LocalReplicatorTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/LocalReplicatorTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/LocalReplicatorTest.java (working copy) @@ -0,0 +1,195 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class LocalReplicatorTest extends ReplicatorTestCase { + + private static final String VERSION_ID = "version"; + + private LocalReplicator replicator; + private Directory sourceDir; + private IndexWriter sourceWriter; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + sourceDir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + sourceWriter = new IndexWriter(sourceDir, conf); + replicator = new LocalReplicator(); + } + + @After + @Override + public void tearDown() throws Exception { + IOUtils.close(replicator, sourceWriter, sourceDir); + super.tearDown(); + } + + private Revision createRevision(final int id) throws IOException { + sourceWriter.addDocument(new Document()); + sourceWriter.setCommitData(new HashMap() {{ + put(VERSION_ID, Integer.toString(id, 16)); + }}); + sourceWriter.commit(); + return new IndexRevision(sourceWriter); + } + + @Test + public void testCheckForUpdateNoRevisions() throws Exception { + assertNull(replicator.checkForUpdate(null)); + } + + @Test + public void testObtainFileAlreadyClosed() throws IOException { + replicator.publish(createRevision(1)); + SessionToken res = replicator.checkForUpdate(null); + assertNotNull(res); + assertEquals(1, res.sourceFiles.size()); + Entry entry = res.sourceFiles.entrySet().iterator().next(); + replicator.close(); + try { + replicator.obtainFile(res.id, entry.getKey(), entry.getValue()[0].fileName); + fail("should have failed on AlreadyClosedException"); + } catch (AlreadyClosedException e) { + // expected + } + } + + @Test + public void testPublishAlreadyClosed() throws IOException { + replicator.close(); + try { + replicator.publish(createRevision(2)); + fail("should have failed on AlreadyClosedException"); + } catch (AlreadyClosedException e) { + // expected + } + } + + @Test + public void testUpdateAlreadyClosed() throws IOException { + replicator.close(); + try { + replicator.checkForUpdate(null); + fail("should have failed on AlreadyClosedException"); + } catch (AlreadyClosedException e) { + // expected + } + } + + @Test + public void testPublishSameRevision() throws IOException { + Revision rev = createRevision(1); + replicator.publish(rev); + SessionToken res = replicator.checkForUpdate(null); + assertNotNull(res); + assertEquals(rev.getVersion(), res.version); + replicator.release(res.id); + replicator.publish(new IndexRevision(sourceWriter)); + res = replicator.checkForUpdate(res.version); + assertNull(res); + + // now make sure that publishing same revision doesn't leave revisions + // "locked", i.e. that replicator releases revisions even when they are not + // kept + replicator.publish(createRevision(2)); + assertEquals(1, DirectoryReader.listCommits(sourceDir).size()); + } + + @Test + public void testPublishOlderRev() throws IOException { + replicator.publish(createRevision(1)); + Revision old = new IndexRevision(sourceWriter); + replicator.publish(createRevision(2)); + try { + replicator.publish(old); + fail("should have failed to publish an older revision"); + } catch (IllegalArgumentException e) { + // expected + } + assertEquals(1, DirectoryReader.listCommits(sourceDir).size()); + } + + @Test + public void testObtainMissingFile() throws IOException { + replicator.publish(createRevision(1)); + SessionToken res = replicator.checkForUpdate(null); + try { + replicator.obtainFile(res.id, res.sourceFiles.keySet().iterator().next(), "madeUpFile"); + fail("should have failed obtaining an unrecognized file"); + } catch (FileNotFoundException e) { + // expected + } + } + + @Test + public void testSessionExpiration() throws IOException, InterruptedException { + replicator.publish(createRevision(1)); + SessionToken session = replicator.checkForUpdate(null); + replicator.setExpirationThreshold(5); // expire quickly + Thread.sleep(50); // sufficient for expiration + try { + replicator.obtainFile(session.id, session.sourceFiles.keySet().iterator().next(), session.sourceFiles.values().iterator().next()[0].fileName); + fail("should have failed to obtain a file for an expired session"); + } catch (SessionExpiredException e) { + // expected + } + } + + @Test + public void testUpdateToLatest() throws IOException { + replicator.publish(createRevision(1)); + Revision rev = createRevision(2); + replicator.publish(rev); + SessionToken res = replicator.checkForUpdate(null); + assertNotNull(res); + assertEquals(0, rev.compareTo(res.version)); + } + + @Test + public void testRevisionRelease() throws Exception { + replicator.publish(createRevision(1)); + assertTrue(sourceDir.fileExists(IndexFileNames.SEGMENTS + "_1")); + replicator.publish(createRevision(2)); + // now the files of revision 1 can be deleted + assertTrue(sourceDir.fileExists(IndexFileNames.SEGMENTS + "_2")); + assertFalse("segments_1 should not be found in index directory after revision is released", sourceDir.fileExists(IndexFileNames.SEGMENTS + "_1")); + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/LocalReplicatorTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/ReplicatorTestCase.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/ReplicatorTestCase.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/ReplicatorTestCase.java (working copy) @@ -0,0 +1,118 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.SocketException; + +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.AfterClass; + +@SuppressCodecs("Lucene3x") +public class ReplicatorTestCase extends LuceneTestCase { + + private static final int BASE_PORT = 7000; + + // if a test calls newServer() multiple times, or some ports already failed, + // don't start from BASE_PORT again + private static int lastPortUsed = -1; + + private static ClientConnectionManager clientConnectionManager; + + @AfterClass + public static void afterClassReplicatorTestCase() throws Exception { + if (clientConnectionManager != null) { + clientConnectionManager.shutdown(); + clientConnectionManager = null; + } + } + + /** + * Returns a new {@link Server HTTP Server} instance. To obtain its port, use + * {@link #serverPort(Server)}. + */ + public static synchronized Server newHttpServer(Handler handler) throws Exception { + int port = lastPortUsed == -1 ? BASE_PORT : lastPortUsed + 1; + Server server = null; + while (true) { + try { + server = new Server(port); + + server.setHandler(handler); + + QueuedThreadPool threadPool = new QueuedThreadPool(); + threadPool.setDaemon(true); + threadPool.setMaxIdleTimeMs(0); + server.setThreadPool(threadPool); + + // this will test the port + server.start(); + + // if here, port is available + lastPortUsed = port; + return server; + } catch (SocketException e) { + stopHttpServer(server); + // this is ok, we'll try the next port until successful. + ++port; + } + } + } + + /** + * Returns a {@link Server}'s port. This method assumes that no + * {@link Connector}s were added to the Server besides the default one. + */ + public static int serverPort(Server httpServer) { + return httpServer.getConnectors()[0].getPort(); + } + + /** + * Stops the given HTTP Server instance. This method does its best to guarantee + * that no threads will be left running following this method. + */ + public static void stopHttpServer(Server httpServer) throws Exception { + httpServer.stop(); + httpServer.join(); + } + + /** + * Returns a {@link ClientConnectionManager}. + *

+ * NOTE: do not {@link ClientConnectionManager#shutdown()} this + * connection manager, it will be shutdown automatically after all tests have + * finished. + */ + public static synchronized ClientConnectionManager getClientConnectionManager() { + if (clientConnectionManager == null) { + PoolingClientConnectionManager ccm = new PoolingClientConnectionManager(); + ccm.setDefaultMaxPerRoute(128); + ccm.setMaxTotal(128); + clientConnectionManager = ccm; + } + + return clientConnectionManager; + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/ReplicatorTestCase.java ___________________________________________________________________ Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/SessionTokenTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/SessionTokenTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/SessionTokenTest.java (working copy) @@ -0,0 +1,63 @@ +package org.apache.lucene.replicator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.junit.Test; + +public class SessionTokenTest extends ReplicatorTestCase { + + @Test + public void testSerialization() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(new Document()); + writer.commit(); + Revision rev = new IndexRevision(writer); + + SessionToken session1 = new SessionToken("17", rev); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session1.serialize(new DataOutputStream(baos)); + byte[] b = baos.toByteArray(); + SessionToken session2 = new SessionToken(new DataInputStream(new ByteArrayInputStream(b))); + assertEquals(session1.id, session2.id); + assertEquals(session1.version, session2.version); + assertEquals(1, session2.sourceFiles.size()); + assertEquals(session1.sourceFiles.size(), session2.sourceFiles.size()); + assertEquals(session1.sourceFiles.keySet(), session2.sourceFiles.keySet()); + RevisionFile[] arr1 = session1.sourceFiles.values().iterator().next(); + RevisionFile[] arr2 = session2.sourceFiles.values().iterator().next(); + assertArrayEquals(arr1, arr2); + + IOUtils.close(writer, dir); + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/SessionTokenTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/http/HttpReplicatorTest.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/http/HttpReplicatorTest.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/http/HttpReplicatorTest.java (working copy) @@ -0,0 +1,120 @@ +package org.apache.lucene.replicator.http; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.replicator.IndexReplicationHandler; +import org.apache.lucene.replicator.IndexRevision; +import org.apache.lucene.replicator.LocalReplicator; +import org.apache.lucene.replicator.PerSessionDirectoryFactory; +import org.apache.lucene.replicator.ReplicationClient; +import org.apache.lucene.replicator.Replicator; +import org.apache.lucene.replicator.ReplicatorTestCase; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util._TestUtil; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Before; +import org.junit.Test; + +public class HttpReplicatorTest extends ReplicatorTestCase { + + private File clientWorkDir; + private Replicator serverReplicator; + private IndexWriter writer; + private DirectoryReader reader; + private Server server; + private int port; + private Directory serverIndexDir, handlerIndexDir; + + private void startServer() throws Exception { + ServletHandler replicationHandler = new ServletHandler(); + ReplicationService service = new ReplicationService(Collections.singletonMap("s1", serverReplicator)); + ServletHolder servlet = new ServletHolder(new ReplicationServlet(service)); + replicationHandler.addServletWithMapping(servlet, ReplicationService.REPLICATION_CONTEXT + "/*"); + server = newHttpServer(replicationHandler); + port = serverPort(server); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + clientWorkDir = _TestUtil.getTempDir("httpReplicatorTest"); + handlerIndexDir = newDirectory(); + serverIndexDir = newDirectory(); + serverReplicator = new LocalReplicator(); + startServer(); + + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy())); + writer = new IndexWriter(serverIndexDir, conf); + reader = DirectoryReader.open(writer, false); + } + + @Override + public void tearDown() throws Exception { + stopHttpServer(server); + IOUtils.close(reader, writer, handlerIndexDir, serverIndexDir); + super.tearDown(); + } + + private void publishRevision(int id) throws IOException { + Document doc = new Document(); + writer.addDocument(doc); + writer.setCommitData(Collections.singletonMap("ID", Integer.toString(id, 16))); + writer.commit(); + serverReplicator.publish(new IndexRevision(writer)); + } + + private void reopenReader() throws IOException { + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull(newReader); + reader.close(); + reader = newReader; + } + + @Test + public void testBasic() throws Exception { + Replicator replicator = new HttpReplicator("localhost", port, ReplicationService.REPLICATION_CONTEXT + "/s1", + getClientConnectionManager()); + ReplicationClient client = new ReplicationClient(replicator, new IndexReplicationHandler(handlerIndexDir, null), + new PerSessionDirectoryFactory(clientWorkDir)); + + publishRevision(1); + client.updateNow(); + reopenReader(); + assertEquals(1, Integer.parseInt(reader.getIndexCommit().getUserData().get("ID"), 16)); + + publishRevision(2); + client.updateNow(); + reopenReader(); + assertEquals(2, Integer.parseInt(reader.getIndexCommit().getUserData().get("ID"), 16)); + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/http/HttpReplicatorTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: lucene/replicator/src/test/org/apache/lucene/replicator/http/ReplicationServlet.java =================================================================== --- lucene/replicator/src/test/org/apache/lucene/replicator/http/ReplicationServlet.java (revision 0) +++ lucene/replicator/src/test/org/apache/lucene/replicator/http/ReplicationServlet.java (working copy) @@ -0,0 +1,41 @@ +package org.apache.lucene.replicator.http; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class ReplicationServlet extends HttpServlet { + + private final ReplicationService service; + + public ReplicationServlet(ReplicationService service) { + super(); + this.service = service; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + service.perform(req, resp); + } + +} Property changes on: lucene/replicator/src/test/org/apache/lucene/replicator/http/ReplicationServlet.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: dev-tools/maven/lucene/replicator/pom.xml.template =================================================================== --- dev-tools/maven/lucene/replicator/pom.xml.template (revision 0) +++ dev-tools/maven/lucene/replicator/pom.xml.template (working copy) @@ -0,0 +1,75 @@ + + + 4.0.0 + + org.apache.lucene + lucene-parent + @version@ + ../pom.xml + + org.apache.lucene + lucene-replicator + jar + Lucene Replicator + Lucene Replicator Module + + lucene/replicator + ../../.. + ${relative-top-level}/${module-directory} + + + scm:svn:${vc-anonymous-base-url}/${module-directory} + scm:svn:${vc-dev-base-url}/${module-directory} + ${vc-browse-base-url}/${module-directory} + + + + + ${project.groupId} + lucene-test-framework + ${project.version} + test + + + ${project.groupId} + lucene-core + ${project.version} + + + ${project.groupId} + lucene-facet + ${project.version} + + + + ${module-path}/src/java + ${module-path}/src/test + + + ${project.build.testSourceDirectory} + + **/*.java + + + + + Property changes on: dev-tools/maven/lucene/replicator/pom.xml.template ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/build.xml =================================================================== --- lucene/build.xml (revision 1478496) +++ lucene/build.xml (working copy) @@ -160,7 +160,13 @@ - + + + + + + + Index: lucene/module-build.xml =================================================================== --- lucene/module-build.xml (revision 1478496) +++ lucene/module-build.xml (working copy) @@ -220,7 +220,29 @@ - + + + + + + + + + + + + + + + + + + + + + + +