Index: lucene/replicator/build.xml
===================================================================
--- lucene/replicator/build.xml (revision 0)
+++ lucene/replicator/build.xml (working copy)
@@ -0,0 +1,49 @@
+
+
+
+
+
+ Files replication utility
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Property changes on: lucene/replicator/build.xml
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/replicator/ivy.xml
===================================================================
--- lucene/replicator/ivy.xml (revision 0)
+++ lucene/replicator/ivy.xml (working copy)
@@ -0,0 +1,50 @@
+
+
+]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Property changes on: lucene/replicator/ivy.xml
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/replicator/lib
===================================================================
--- lucene/replicator/lib (revision 0)
+++ lucene/replicator/lib (working copy)
Property changes on: lucene/replicator/lib
___________________________________________________________________
Added: svn:ignore
## -0,0 +1 ##
+*.jar
Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java (working copy)
@@ -0,0 +1,200 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.index.NoMergeScheduler;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Version;
+
+/**
+ * A {@link ReplicationHandler} for replication of an index and taxonomy pair.
+ * See {@link IndexReplicationHandler} for more detail, but this handler ensures
+ * that the search and taxonomy indexes are replicated in a consistent way.
+ * Taxonomy indexes that recreated either via {@link OpenMode#CREATE} or
+ * {@link DirectoryTaxonomyWriter#replaceTaxonomy(Directory)} are not supported
+ * by this handler.
+ *
+ * @see IndexReplicationHandler
+ *
+ * @lucene.experimental
+ */
+public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
+
+ private final Directory indexDir;
+ private final Directory taxoDir;
+ private final Callable callback;
+
+ private long taxoEpoch;
+ private Map currentRevisionFiles;
+ private String currentVersion;
+
+ /**
+ * Constructor with the given index directory and callback to notify when the
+ * indexes were updated.
+ */
+ public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable callback)
+ throws IOException {
+ final boolean indexExists = DirectoryReader.indexExists(indexDir);
+ final boolean taxoExists = DirectoryReader.indexExists(taxoDir);
+ if (!indexExists && !taxoExists) {
+ currentRevisionFiles = null;
+ currentVersion = null;
+ taxoEpoch = -1;
+ } else if (indexExists != taxoExists) {
+ throw new IllegalArgumentException("both search and taxonomy indexes must exist: indexExists=" + indexExists
+ + " taxoExists=" + taxoExists);
+ } else {
+ List commits = DirectoryReader.listCommits(indexDir);
+ IndexCommit indexCommit = commits.get(commits.size() - 1);
+ commits = DirectoryReader.listCommits(taxoDir);
+ IndexCommit taxoCommit = commits.get(commits.size() - 1);
+ currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit);
+ currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit);
+ taxoEpoch = Long.parseLong(taxoCommit.getUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH));
+ }
+ this.callback = callback;
+ this.indexDir = indexDir;
+ this.taxoDir = taxoDir;
+ }
+
+ @Override
+ public String currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public Map currentRevisionFiles() {
+ return currentRevisionFiles;
+ }
+
+ private List processFiles(RevisionFile[] files, Directory clientDir, Directory targetDir)
+ throws IOException {
+ List toSync = new ArrayList();
+ if (clientDir.equals(targetDir)) {
+ // files copied directly to index directory, sync all revision files in
+ // the order specified by revisionFiles.
+ for (RevisionFile file : files) {
+ toSync.add(file.fileName);
+ }
+ } else {
+ // files were copied to a transient directory. copy them to the index
+ // directory and sync, but do so in the same order they are given.
+ for (RevisionFile file : files) {
+ if (clientDir.fileExists(file.fileName)) {
+ clientDir.copy(targetDir, file.fileName, file.fileName, IOContext.READONCE);
+ toSync.add(file.fileName);
+ }
+ }
+ }
+
+ // make sure that the last file to sync is the segments_N file and fail if
+ // not. the reason why the code fails instead of putting segments_N file
+ // last is that this indicates an error in the Revision implementation.
+ if (!toSync.isEmpty()) {
+ String lastFile = toSync.get(toSync.size() - 1);
+ if (!lastFile.startsWith(IndexFileNames.SEGMENTS) || lastFile.equals(IndexFileNames.SEGMENTS_GEN)) {
+ throw new IllegalStateException("last file to copy+sync must be segments_N but got " + lastFile
+ + "; check your Revision implementation!");
+ }
+ }
+ return toSync;
+ }
+
+ @Override
+ public void revisionReady(String version, Map revisionFiles,
+ Map sourceDirectory) throws IOException {
+ // copy taxonomy index files first. If anything goes wrong between this line
+ // and the next, it is ok if the taxonomy index is more 'advanced' than the
+ // search index, as it only means it contains more categories.
+ List taxoFiles = processFiles(revisionFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE),
+ sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE), taxoDir);
+ List indexFiles = processFiles(revisionFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE),
+ sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE), indexDir);
+
+ // sync all copied files in the order they are given except the segments_N
+ // file. We sync both the taxonomy and index segments files in the end, so
+ // the sync is more close to atomically.
+ indexDir.sync(indexFiles.subList(0, indexFiles.size() - 1));
+ if (!taxoFiles.isEmpty()) {
+ taxoDir.sync(taxoFiles.subList(0, taxoFiles.size() - 1));
+ }
+
+ // sync taxo segments before index segments because if is ok if the taxonomy
+ // contains more categories than the search index knows about, but not ok
+ // the other way around. That way, if the process crashes after taxoSegments
+ // was synced, but before indexSegments, a searcher-taxonomy pair will be
+ // valid.
+ if (!taxoFiles.isEmpty()) {
+ taxoDir.sync(Collections.singletonList(taxoFiles.get(taxoFiles.size() - 1)));
+ }
+ indexDir.sync(Collections.singletonList(indexFiles.get(indexFiles.size() - 1)));
+
+ // verify that the replicated taxonomy index is of the same epoch as the one
+ // this handler was created with
+ List commits = DirectoryReader.listCommits(taxoDir);
+ IndexCommit ic = commits.get(commits.size() - 1);
+ long replicatedEpoch = Long.parseLong(ic.getUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH));
+ if (taxoEpoch == -1) {
+ // handler was created when taxonomy index didn't exist, so just assign
+ // the epoch for the first time.
+ taxoEpoch = replicatedEpoch;
+ } else if (taxoEpoch != replicatedEpoch) {
+ throw new IllegalStateException("handler created with taxonomy epoch " + taxoEpoch
+ + " however replicated taxonomy epoch is " + replicatedEpoch + "; this is not supported!");
+ }
+
+ // touch the taxonomy index so that it reads the latest commit point and deletes any unused files
+ IndexWriterConfig taxoConf = new IndexWriterConfig(Version.LUCENE_50, null);
+ taxoConf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening
+ new IndexWriter(taxoDir, taxoConf).close();
+
+ // touch the index so that it reads the latest commit point and deletes any unused files
+ IndexWriterConfig indexConf = new IndexWriterConfig(Version.LUCENE_50, null);
+ indexConf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening
+ new IndexWriter(indexDir, indexConf).close();
+
+ currentVersion = version;
+ currentRevisionFiles = revisionFiles;
+
+ // successfully updated the index, notify the callback that the index is ready.
+ if (callback != null) {
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java (working copy)
@@ -0,0 +1,226 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
+import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+/**
+ * A {@link Revision} of a single index and taxonomy index files which comprises
+ * the list of files from both indexes. This revision should be used whenever a
+ * pair of search and taxonomy indexes need to be replicated together to
+ * guarantee consistency of both on the replicating (client) side.
+ *
+ * @see IndexRevision
+ *
+ * @lucene.experimental
+ */
+public class IndexAndTaxonomyRevision implements Revision {
+
+ /**
+ * A {@link DirectoryTaxonomyWriter} which sets the underlying
+ * {@link IndexWriter}'s {@link IndexDeletionPolicy} to
+ * {@link SnapshotDeletionPolicy}.
+ */
+ public static final class SnapshotDirectoryTaxonomyWriter extends DirectoryTaxonomyWriter {
+
+ private SnapshotDeletionPolicy sdp;
+ private IndexWriter writer;
+
+ /**
+ * @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory,
+ * IndexWriterConfig.OpenMode, TaxonomyWriterCache)
+ */
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode, TaxonomyWriterCache cache)
+ throws IOException {
+ super(directory, openMode, cache);
+ }
+
+ /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory, IndexWriterConfig.OpenMode) */
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode) throws IOException {
+ super(directory, openMode);
+ }
+
+ /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory) */
+ public SnapshotDirectoryTaxonomyWriter(Directory d) throws IOException {
+ super(d);
+ }
+
+ @Override
+ protected IndexWriterConfig createIndexWriterConfig(OpenMode openMode) {
+ IndexWriterConfig conf = super.createIndexWriterConfig(openMode);
+ conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
+ return conf;
+ }
+
+ @Override
+ protected IndexWriter openIndexWriter(Directory directory, IndexWriterConfig config) throws IOException {
+ writer = super.openIndexWriter(directory, config);
+ // must set it here because IndexWriter clones the config
+ sdp = (SnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
+ return writer;
+ }
+
+ /** Returns the {@link SnapshotDeletionPolicy} used by the underlying {@link IndexWriter}. */
+ public SnapshotDeletionPolicy getDeletionPolicy() {
+ return sdp;
+ }
+
+ /** Returns the {@link IndexWriter} used by this {@link DirectoryTaxonomyWriter}. */
+ public IndexWriter getIndexWriter() {
+ return writer;
+ }
+
+ }
+
+ private static final int RADIX = 16;
+
+ public static final String INDEX_SOURCE = "index";
+ public static final String TAXONOMY_SOURCE = "taxo";
+
+ private final IndexWriter indexWriter;
+ private final SnapshotDirectoryTaxonomyWriter taxoWriter;
+ private final IndexCommit indexCommit, taxoCommit;
+ private final SnapshotDeletionPolicy indexSDP, taxoSDP;
+ private final String version;
+ private final Map sourceFiles;
+
+ /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ public static Map revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit)
+ throws IOException {
+ HashMap files = new HashMap();
+ files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next());
+ files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next());
+ return files;
+ }
+
+ /**
+ * Returns a String representation of a revision's version from the given
+ * {@link IndexCommit}s of the search and taxonomy indexes.
+ */
+ public static String revisionVersion(IndexCommit indexCommit, IndexCommit taxoCommit) {
+ return Long.toString(indexCommit.getGeneration(), RADIX) + ":" + Long.toString(taxoCommit.getGeneration(), RADIX);
+ }
+
+ /**
+ * Constructor over the given {@link IndexWriter}. Uses the last
+ * {@link IndexCommit} found in the {@link Directory} managed by the given
+ * writer.
+ */
+ public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxoWriter)
+ throws IOException {
+ IndexDeletionPolicy delPolicy = indexWriter.getConfig().getIndexDeletionPolicy();
+ if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ }
+ this.indexWriter = indexWriter;
+ this.taxoWriter = taxoWriter;
+ this.indexSDP = (SnapshotDeletionPolicy) delPolicy;
+ this.taxoSDP = taxoWriter.getDeletionPolicy();
+ this.indexCommit = indexSDP.snapshot();
+ this.taxoCommit = taxoSDP.snapshot();
+ this.version = revisionVersion(indexCommit, taxoCommit);
+ this.sourceFiles = revisionFiles(indexCommit, taxoCommit);
+ }
+
+ @Override
+ public int compareTo(String version) {
+ final String[] parts = version.split(":");
+ final long indexGen = Long.parseLong(parts[0], RADIX);
+ final long taxoGen = Long.parseLong(parts[1], RADIX);
+ final long indexCommitGen = indexCommit.getGeneration();
+ final long taxoCommitGen = taxoCommit.getGeneration();
+
+ // if the index generation is not the same as this commit's generation,
+ // compare by it. Otherwise, compare by the taxonomy generation.
+ if (indexCommitGen < indexGen) {
+ return -1;
+ } else if (indexCommitGen > indexGen) {
+ return 1;
+ } else {
+ return taxoCommitGen < taxoGen ? -1 : (taxoCommitGen > taxoGen ? 1 : 0);
+ }
+ }
+
+ @Override
+ public int compareTo(Revision o) {
+ IndexAndTaxonomyRevision other = (IndexAndTaxonomyRevision) o;
+ int cmp = indexCommit.compareTo(other.indexCommit);
+ return cmp != 0 ? cmp : taxoCommit.compareTo(other.taxoCommit);
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public Map getSourceFiles() {
+ return sourceFiles;
+ }
+
+ @Override
+ public InputStream open(String source, String fileName) throws IOException {
+ assert source.equals(INDEX_SOURCE) || source.equals(TAXONOMY_SOURCE) : "invalid source; expected=(" + INDEX_SOURCE
+ + " or " + TAXONOMY_SOURCE + ") got=" + source;
+ IndexCommit ic = source.equals(INDEX_SOURCE) ? indexCommit : taxoCommit;
+ return new IndexInputInputStream(ic.getDirectory().openInput(fileName, IOContext.READONCE));
+ }
+
+ @Override
+ public void release() throws IOException {
+ try {
+ indexSDP.release(indexCommit);
+ } finally {
+ taxoSDP.release(taxoCommit);
+ }
+
+ try {
+ indexWriter.deleteUnusedFiles();
+ } finally {
+ taxoWriter.getIndexWriter().deleteUnusedFiles();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("IndexAndTaxonomyRevision [version=").append(version).append(" sourceFiles={");
+ for (Entry e : sourceFiles.entrySet()) {
+ sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';');
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java (working copy)
@@ -0,0 +1,92 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * An {@link InputStream} which wraps an {@link IndexInput}.
+ *
+ * @lucene.experimental
+ */
+public final class IndexInputInputStream extends InputStream {
+
+ private final IndexInput in;
+
+ private long remaining;
+
+ public IndexInputInputStream(IndexInput in) {
+ this.in = in;
+ remaining = in.length();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining == 0) {
+ return -1;
+ } else {
+ --remaining;
+ return in.readByte();
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) in.length();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < len) {
+ len = (int) remaining;
+ }
+ in.readBytes(b, off, len);
+ remaining -= len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < n) {
+ n = remaining;
+ }
+ in.seek(in.getFilePointer() + n);
+ remaining -= n;
+ return n;
+ }
+
+}
\ No newline at end of file
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java (working copy)
@@ -0,0 +1,151 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergeScheduler;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Version;
+
+/**
+ * A {@link ReplicationHandler} for replication of an index. Implements
+ * {@link #revisionReady} by copying the files pointed by the client resolver to
+ * the index {@link Directory} and then touches the index with
+ * {@link IndexWriter} to make sure any unused files are deleted.
+ *
+ * NOTE: assumes that {@link IndexWriter} is not opened by another
+ * process on the index directory. In fact, opening an {@link IndexWriter} on
+ * the same directory to which files are copied can lead to undefined behavior,
+ * where some or all the files will be deleted, override other files or simply
+ * create a mess. When you replicate an index, it is best if the index is never
+ * modified by {@link IndexWriter}, except the one that is open on the source
+ * index, from which you replicate.
+ *
+ * This handler notifies the application via a provided {@link Callable} when an
+ * updated index commit was made available for it.
+ *
+ * @lucene.experimental
+ */
+public class IndexReplicationHandler implements ReplicationHandler {
+
+ private final Directory indexDir;
+ private final Callable callback;
+
+ private Map currentRevisionFiles;
+ private String currentVersion;
+
+ /**
+ * Constructor with the given index directory and callback to notify when the
+ * indexes were updated.
+ */
+ public IndexReplicationHandler(Directory indexDir, Callable callback) throws IOException {
+ this.callback = callback;
+ this.indexDir = indexDir;
+ if (!DirectoryReader.indexExists(indexDir)) {
+ currentRevisionFiles = null;
+ currentVersion = null;
+ } else {
+ List commits = DirectoryReader.listCommits(indexDir);
+ IndexCommit commit = commits.get(commits.size() - 1);
+ currentRevisionFiles = IndexRevision.revisionFiles(commit);
+ currentVersion = IndexRevision.revisionVersion(commit);
+ }
+ }
+
+ @Override
+ public String currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public Map currentRevisionFiles() {
+ return currentRevisionFiles;
+ }
+
+ @Override
+ public void revisionReady(String version, Map revisionFiles,
+ Map sourceDirectory) throws IOException {
+ if (revisionFiles.size() > 1) {
+ throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
+ }
+
+ Entry entry = revisionFiles.entrySet().iterator().next();
+ Directory clientDir = sourceDirectory.get(entry.getKey());
+ List toSync = new ArrayList();
+ if (clientDir.equals(indexDir)) {
+ // files copied directly to index directory, sync all revision files in
+ // the order specified by revisionFiles.
+ for (RevisionFile file : entry.getValue()) {
+ toSync.add(file.fileName);
+ }
+ } else {
+ // files were copied to a transient directory. copy them to the index
+ // directory and sync, but do so in the order specified by revisionFiles.
+ for (RevisionFile file : entry.getValue()) {
+ if (clientDir.fileExists(file.fileName)) {
+ clientDir.copy(indexDir, file.fileName, file.fileName, IOContext.READONCE);
+ toSync.add(file.fileName);
+ }
+ }
+ }
+
+ // sync all copied files. the sync happens in order, and IndexRevision
+ // ensures that the last file sent is the segments_N file. Still, verify
+ // that it's indeed the case. The reason why the code fails instead of
+ // putting segments_N file last is that this indicates an error in the
+ // Revision implementation.
+ String lastFile = toSync.get(toSync.size() - 1);
+ if (!lastFile.startsWith(IndexFileNames.SEGMENTS) || lastFile.equals(IndexFileNames.SEGMENTS_GEN)) {
+ throw new IllegalStateException("last file to copy+sync must be segments_N but got " + lastFile
+ + "; check your Revision implementation!");
+ }
+ indexDir.sync(toSync);
+
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_50, null);
+ conf.setMergeScheduler(NoMergeScheduler.INSTANCE); // prevent any merges from happening
+
+ // touch the index so that it reads the latest commit point and deletes any unused files
+ new IndexWriter(indexDir, conf).close();
+
+ currentVersion = version;
+ currentRevisionFiles = revisionFiles;
+
+ // successfully updated the index, notify the callback that the index is ready.
+ if (callback != null) {
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java (working copy)
@@ -0,0 +1,157 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+/**
+ * A {@link Revision} of a single index files which comprises the list of files
+ * that are part of the current {@link IndexCommit}. To ensure the files are not
+ * deleted by {@link IndexWriter} for as long as this revision stays alive (i.e.
+ * until {@link #release()}), the current commit point is snapshotted, using
+ * {@link SnapshotDeletionPolicy} (this means that the given writer's
+ * {@link IndexWriterConfig#getIndexDeletionPolicy() config} should return
+ * {@link SnapshotDeletionPolicy}).
+ *
+ * When this revision is {@link #release() released}, it releases the obtained
+ * snapshot as well as calls {@link IndexWriter#deleteUnusedFiles()} so that the
+ * snapshotted files are deleted (if they are no longer needed).
+ *
+ * @lucene.experimental
+ */
+public class IndexRevision implements Revision {
+
+ private static final int RADIX = 16;
+ private static final String SOURCE = "index";
+
+ private final IndexWriter writer;
+ private final IndexCommit commit;
+ private final SnapshotDeletionPolicy sdp;
+ private final String version;
+ private final Map sourceFiles;
+
+ // returns a RevisionFile with some metadata
+ private static RevisionFile newRevisionFile(String file, Directory dir) throws IOException {
+ RevisionFile revFile = new RevisionFile(file);
+ revFile.size = dir.fileLength(file);
+ return revFile;
+ }
+
+ /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ public static Map revisionFiles(IndexCommit commit) throws IOException {
+ Collection commitFiles = commit.getFileNames();
+ RevisionFile[] revisionFiles = new RevisionFile[commitFiles.size()];
+ String segmentsFile = commit.getSegmentsFileName();
+ Directory dir = commit.getDirectory();
+
+ revisionFiles[revisionFiles.length - 1] = newRevisionFile(segmentsFile, dir); // segments_N must be last
+ int idx = 0;
+ for (String file : commitFiles) {
+ if (!file.equals(segmentsFile)) {
+ revisionFiles[idx++] = newRevisionFile(file, dir);
+ }
+ }
+ return Collections.singletonMap(SOURCE, revisionFiles);
+ }
+
+ /**
+ * Returns a String representation of a revision's version from the given
+ * {@link IndexCommit}.
+ */
+ public static String revisionVersion(IndexCommit commit) {
+ return Long.toString(commit.getGeneration(), RADIX);
+ }
+
+ /**
+ * Constructor over the given {@link IndexWriter}. Uses the last
+ * {@link IndexCommit} found in the {@link Directory} managed by the given
+ * writer.
+ */
+ public IndexRevision(IndexWriter writer) throws IOException {
+ IndexDeletionPolicy delPolicy = writer.getConfig().getIndexDeletionPolicy();
+ if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ }
+ this.writer = writer;
+ this.sdp = (SnapshotDeletionPolicy) delPolicy;
+ this.commit = sdp.snapshot();
+ this.version = revisionVersion(commit);
+ this.sourceFiles = revisionFiles(commit);
+ }
+
+ @Override
+ public int compareTo(String version) {
+ long gen = Long.parseLong(version, RADIX);
+ long commitGen = commit.getGeneration();
+ return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0);
+ }
+
+ @Override
+ public int compareTo(Revision o) {
+ IndexRevision other = (IndexRevision) o;
+ return commit.compareTo(other.commit);
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public Map getSourceFiles() {
+ return sourceFiles;
+ }
+
+ @Override
+ public InputStream open(String source, String fileName) throws IOException {
+ assert source.equals(SOURCE) : "invalid source; expected=" + SOURCE + " got=" + source;
+ return new IndexInputInputStream(commit.getDirectory().openInput(fileName, IOContext.READONCE));
+ }
+
+ @Override
+ public void release() throws IOException {
+ sdp.release(commit);
+ writer.deleteUnusedFiles();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("IndexRevision [version=").append(version).append(" sourceFiles={");
+ for (Entry e : sourceFiles.entrySet()) {
+ sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';');
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java (working copy)
@@ -0,0 +1,247 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * A {@link Replicator} implementation for use by the side that publishes
+ * {@link Revision}s, as well for clients to {@link #checkForUpdate(String)
+ * check for updates}. When a client needs to be updated, it is returned a
+ * {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. As long as a revision is being replicated, this replicator
+ * guarantees that it will not be {@link Revision#release() released}.
+ *
+ * Replication sessions expire by default after
+ * {@link #DEFAULT_SESSION_EXPIRATION_THRESHOLD}, and the threshold can be
+ * configured through {@link #setExpirationThreshold(long)}.
+ *
+ * @lucene.experimental
+ */
+public class LocalReplicator implements Replicator {
+
+ private static class RefCountedRevision {
+ private final AtomicInteger refCount = new AtomicInteger(1);
+ public final Revision revision;
+
+ public RefCountedRevision(Revision revision) {
+ this.revision = revision;
+ }
+
+ public void decRef() throws IOException {
+ if (refCount.get() <= 0) {
+ throw new IllegalStateException("this revision is already released");
+ }
+
+ final int rc = refCount.decrementAndGet();
+ if (rc == 0) {
+ boolean success = false;
+ try {
+ revision.release();
+ success = true;
+ } finally {
+ if (!success) {
+ // Put reference back on failure
+ refCount.incrementAndGet();
+ }
+ }
+ } else if (rc < 0) {
+ throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement");
+ }
+ }
+
+ public void incRef() {
+ refCount.incrementAndGet();
+ }
+
+ }
+
+ private static class ReplicationSession {
+ public final SessionToken session;
+ public final RefCountedRevision revision;
+ private volatile long lastAccessTime;
+
+ ReplicationSession(SessionToken session, RefCountedRevision revision) {
+ this.session = session;
+ this.revision = revision;
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ boolean isExpired(long expirationThreshold) {
+ return lastAccessTime < (System.currentTimeMillis() - expirationThreshold);
+ }
+
+ void markAccessed() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+ }
+
+ /** Threshold for expiring inactive sessions. Defaults to 30 minutes. */
+ public static final long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30;
+
+ private long expirationThresholdMilllis = LocalReplicator.DEFAULT_SESSION_EXPIRATION_THRESHOLD;
+
+ private volatile RefCountedRevision currentRevision;
+ private volatile boolean closed = false;
+
+ private final AtomicInteger sessionToken = new AtomicInteger(0);
+ private final Map sessions = new HashMap();
+
+ private void checkExpiredSessions() throws IOException {
+ // make a "to-delete" list so we don't risk deleting from the map while iterating it
+ final ArrayList toExpire = new ArrayList();
+ for (ReplicationSession token : sessions.values()) {
+ if (token.isExpired(expirationThresholdMilllis)) {
+ toExpire.add(token);
+ }
+ }
+ for (ReplicationSession token : toExpire) {
+ releaseSession(token.session.id);
+ }
+ }
+
+ private void releaseSession(String sessionID) throws IOException {
+ ReplicationSession session = sessions.remove(sessionID);
+ // if we're called concurrently by close() and release(), could be that one
+ // thread beats the other to release the session.
+ if (session != null) {
+ session.revision.decRef();
+ }
+ }
+
+ /** Ensure that replicator is still open, or throw {@link AlreadyClosedException} otherwise. */
+ protected final synchronized void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("This replicator has already been closed");
+ }
+ }
+
+ @Override
+ public synchronized SessionToken checkForUpdate(String currentVersion) {
+ ensureOpen();
+ if (currentRevision == null) { // no published revisions yet
+ return null;
+ }
+
+ if (currentVersion != null && currentRevision.revision.compareTo(currentVersion) <= 0) {
+ // currentVersion is newer or equal to latest published revision
+ return null;
+ }
+
+ // currentVersion is either null or older than latest published revision
+ currentRevision.incRef();
+ final String sessionID = Integer.toString(sessionToken.incrementAndGet());
+ final SessionToken sessionToken = new SessionToken(sessionID, currentRevision.revision);
+ final ReplicationSession timedSessionToken = new ReplicationSession(sessionToken, currentRevision);
+ sessions.put(sessionID, timedSessionToken);
+ return sessionToken;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ // release all managed revisions
+ for (ReplicationSession session : sessions.values()) {
+ session.revision.decRef();
+ }
+ sessions.clear();
+ closed = true;
+ }
+ }
+
+ /**
+ * Returns the expiration threshold.
+ *
+ * @see #setExpirationThreshold(long)
+ */
+ public long getExpirationThreshold() {
+ return expirationThresholdMilllis;
+ }
+
+ @Override
+ public synchronized InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ ensureOpen();
+ ReplicationSession session = sessions.get(sessionID);
+ if (session != null && session.isExpired(expirationThresholdMilllis)) {
+ releaseSession(sessionID);
+ session = null;
+ }
+ // session either previously expired, or we just expired it
+ if (session == null) {
+ throw new SessionExpiredException("session (" + sessionID + ") expired while obtaining file: source=" + source
+ + " file=" + fileName);
+ }
+ sessions.get(sessionID).markAccessed();
+ return session.revision.revision.open(source, fileName);
+ }
+
+ @Override
+ public synchronized void publish(Revision revision) throws IOException {
+ ensureOpen();
+ if (currentRevision != null) {
+ int compare = revision.compareTo(currentRevision.revision);
+ if (compare == 0) {
+ // same revision published again, ignore but release it
+ revision.release();
+ return;
+ }
+
+ if (compare < 0) {
+ revision.release();
+ throw new IllegalArgumentException("Cannot publish an older revision: rev=" + revision + " current="
+ + currentRevision);
+ }
+ }
+
+ // swap revisions
+ final RefCountedRevision oldRevision = currentRevision;
+ currentRevision = new RefCountedRevision(revision);
+ if (oldRevision != null) {
+ oldRevision.decRef();
+ }
+
+ // check for expired sessions
+ checkExpiredSessions();
+ }
+
+ @Override
+ public synchronized void release(String sessionID) throws IOException {
+ ensureOpen();
+ releaseSession(sessionID);
+ }
+
+ /**
+ * Modify session expiration time - if a replication session is inactive that
+ * long it is automatically expired, and further attempts to operate within
+ * this session will throw a {@link SessionExpiredException}.
+ */
+ public synchronized void setExpirationThreshold(long expirationThreshold) throws IOException {
+ ensureOpen();
+ this.expirationThresholdMilllis = expirationThreshold;
+ checkExpiredSessions();
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java
___________________________________________________________________
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java (working copy)
@@ -0,0 +1,77 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+
+/**
+ * A {@link SourceDirectoryFactory} which returns {@link FSDirectory} under a
+ * dedicated session directory. When a session is over, the entire directory is
+ * deleted.
+ *
+ * @lucene.experimental
+ */
+public class PerSessionDirectoryFactory implements SourceDirectoryFactory {
+
+ private final File workDir;
+
+ /** Constructor with the given sources mapping. */
+ public PerSessionDirectoryFactory(File workDir) {
+ this.workDir = workDir;
+ }
+
+ private void rm(File file) throws IOException {
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ rm(f);
+ }
+ }
+
+ // This should be either an empty directory, or a file
+ if (!file.delete() && file.exists()) {
+ throw new IOException("failed to delete " + file);
+ }
+ }
+
+ @Override
+ public Directory getDirectory(String sessionID, String source) throws IOException {
+ File sessionDir = new File(workDir, sessionID);
+ if (!sessionDir.exists() && !sessionDir.mkdirs()) {
+ throw new IOException("failed to create session directory " + sessionDir);
+ }
+ File sourceDir = new File(sessionDir, source);
+ if (!sourceDir.mkdirs()) {
+ throw new IOException("failed to create source directory " + sourceDir);
+ }
+ return FSDirectory.open(sourceDir);
+ }
+
+ @Override
+ public void cleanupSession(String sessionID) throws IOException {
+ if (sessionID.isEmpty()) { // protect against deleting workDir entirely!
+ throw new IllegalArgumentException("sessionID cannot be empty");
+ }
+ rm(new File(workDir, sessionID));
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (working copy)
@@ -0,0 +1,371 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A client which monitors and obtains new revisions from a {@link Replicator}.
+ * It can be used to either periodically check for updates by invoking
+ * {@link #startUpdateThread}, or manually by calling {@link #updateNow()}.
+ *
+ * Whenever a new revision is available, the {@link #requiredFiles(Map)} are
+ * copied to the {@link Directory} specified by {@link PerSessionDirectoryFactory} and
+ * a handler is notified.
+ *
+ * @lucene.experimental
+ */
+public class ReplicationClient implements Closeable {
+
+ /** Handler for revisions obtained by the client. */
+ public static interface ReplicationHandler {
+
+ /** Returns the current revision version held by the handler. */
+ public String currentVersion();
+
+ /** Returns the current revision files held by the handler. */
+ public Map currentRevisionFiles();
+
+ /**
+ * Called when a new revision was obtained and is available (i.e. all needed
+ * files were successfully copied).
+ */
+ public void revisionReady(String version, Map revisionFiles,
+ Map sourceDirectory) throws IOException;
+ }
+
+ /**
+ * Resolves a session and source into a {@link Directory} to use for copying
+ * the session files to.
+ */
+ public static interface SourceDirectoryFactory {
+
+ /**
+ * Returns the {@link Directory} to use for the given session and source.
+ * Implementations may e.g. return different directories for different
+ * sessions, or the same directory for all sessions. In that case, it is
+ * advised to clean the directory before it is used for a new session.
+ *
+ * @see #cleanupSession(String)
+ */
+ public Directory getDirectory(String sessionID, String source) throws IOException;
+
+ /**
+ * Called to denote that the replication actions for this session were finished and the directory is no longer needed.
+ */
+ public void cleanupSession(String sessionID) throws IOException;
+
+ }
+
+ private class ReplicationThread extends Thread {
+
+ private final long interval;
+
+ // client uses this to stop us
+ final CountDownLatch stop = new CountDownLatch(1);
+
+ public ReplicationThread(long interval) {
+ this.interval = interval;
+ }
+
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ while (true) {
+ long time = System.currentTimeMillis();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } catch (Throwable t) {
+ handleUpdateException(t);
+ } finally {
+ updateLock.unlock();
+ }
+ time = System.currentTimeMillis() - time;
+
+ // adjust timeout to compensate the time spent doing the replication.
+ final long timeout = interval - time;
+ if (timeout > 0) {
+ try {
+ // this will return immediately if we were ordered to stop (count=0)
+ // or the timeout has elapsed. if it returns true, it means count=0,
+ // so terminate.
+ if (stop.await(timeout, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ // if we were interruted, somebody wants to terminate us, so just
+ // throw the exception further.
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ }
+ }
+ }
+
+ }
+
+ /** The component name to use with {@link InfoStream#isEnabled(String)}. */
+ public static final String INFO_STREAM_COMPONENT = "ReplicationThread";
+
+ private final Replicator replicator;
+ private final ReplicationHandler handler;
+ private final SourceDirectoryFactory factory;
+ private final byte[] copyBuffer = new byte[16384];
+ private final Lock updateLock = new ReentrantLock();
+
+ private volatile ReplicationThread updateThread;
+ private volatile boolean closed = false;
+
+ /**
+ * Constructor.
+ *
+ * @param replicator the {@link Replicator} used for checking for updates
+ * @param handler notified when new revisions are ready
+ * @param factory returns a {@link Directory} for a given source and session
+ */
+ public ReplicationClient(Replicator replicator, ReplicationHandler handler, SourceDirectoryFactory factory) {
+ this.replicator = replicator;
+ this.handler = handler;
+ this.factory = factory;
+ }
+
+ private void copyBytes(IndexOutput out, InputStream in) throws IOException {
+ int numBytes;
+ while ((numBytes = in.read(copyBuffer)) > 0) {
+ out.writeBytes(copyBuffer, 0, numBytes);
+ }
+ }
+
+ /** Throws {@link AlreadyClosedException} if the client has already been closed. */
+ protected final void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("this update client has already been closed");
+ }
+ }
+
+ private void doUpdate() throws IOException {
+ SessionToken session = null;
+ final Map sourceDirectory = new HashMap();
+ boolean notify = false;
+ try {
+ final String version = handler.currentVersion();
+ session = replicator.checkForUpdate(version);
+ if (session == null) {
+ // already up to date
+ return;
+ }
+ for (Entry e : requiredFiles(session.sourceFiles).entrySet()) {
+ String source = e.getKey();
+ Directory dir = factory.getDirectory(session.id, source);
+ sourceDirectory.put(source, dir);
+ for (RevisionFile file : e.getValue()) {
+ if (closed) {
+ // we're either closed, or the update thread was stopped - abort files copy
+ return;
+ }
+ InputStream in = null;
+ IndexOutput out = null;
+ try {
+ in = replicator.obtainFile(session.id, source, file.fileName);
+ out = dir.createOutput(file.fileName, IOContext.DEFAULT);
+ copyBytes(out, in);
+ // TODO add some validation, on size / checksum
+ } finally {
+ IOUtils.close(in, out);
+ }
+ }
+ }
+ // only notify if all needed files were successfully obtained.
+ notify = true;
+ } finally {
+ if (session != null) {
+ try {
+ replicator.release(session.id);
+ } finally {
+ if (!notify) { // cleanup after ourselves
+ IOUtils.close(sourceDirectory.values());
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+ }
+
+ // notify outside the try-finally above, so the session is released sooner.
+ // the handler may take time to finish acting on the copied files, but the
+ // session itself is no longer needed.
+ try {
+ if (notify && !closed ) { // no use to notify if we are closed already
+ handler.revisionReady(session.version, session.sourceFiles, sourceDirectory);
+ }
+ } finally {
+ IOUtils.close(sourceDirectory.values());
+ if (session != null) {
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+
+ /**
+ * Start the update thread with the specified interval in milliseconds. For
+ * debugging purposes, you can optionally set the name to set on
+ * {@link Thread#setName(String)}. If you pass {@code null}, a default name
+ * will be set.
+ *
+ * @throws IllegalStateException if the thread has already been started
+ */
+ public synchronized void startUpdateThread(long intervalMillis, String threadName) {
+ ensureOpen();
+ if (updateThread != null) {
+ throw new IllegalStateException(
+ "cannot start an update thread when one is running, must first call 'stopUpdateThread()'");
+ }
+ threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName;
+ updateThread = new ReplicationThread(intervalMillis);
+ updateThread.setName(threadName);
+ updateThread.start();
+ }
+
+ /**
+ * Stop the update thread. If the update thread is not running, silently does
+ * nothing. This method returns after the update thread has stopped.
+ */
+ public synchronized void stopUpdateThread() {
+ if (updateThread != null) {
+ // this will trigger the thread to terminate if it awaits the lock.
+ // otherwise, if it's in the middle of replication, we wait for it to
+ // stop.
+ updateThread.stop.countDown();
+ try {
+ updateThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ updateThread = null;
+ }
+ }
+
+ /**
+ * Executes the update operation immediately, irregardess if an update thread
+ * is running or not.
+ */
+ public void updateNow() throws IOException {
+ ensureOpen();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ stopUpdateThread();
+ closed = true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ String res = "ReplicationClient";
+ if (updateThread != null) {
+ res += " (" + updateThread.getName() + ")";
+ }
+ return res;
+ }
+
+ /**
+ * Called when an exception is hit by the replication thread. The default
+ * implementation prints the full stacktrace to
+ * {@link InfoStream#getDefault()} (if {@link InfoStream#isEnabled(String)}
+ * for {@link #INFO_STREAM_COMPONENT}), and you can override to log the
+ * exception elswhere.
+ *
+ * NOTE: if you override this method to throw the exception further,
+ * the replication thread will be terminated. The only way to restart it is to
+ * call {@link #stopUpdateThread()} followed by
+ * {@link #startUpdateThread(long, String)}.
+ */
+ protected void handleUpdateException(Throwable t) {
+ final StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ InfoStream stream = InfoStream.getDefault();
+ if (stream.isEnabled(INFO_STREAM_COMPONENT)) {
+ stream.message(INFO_STREAM_COMPONENT, "an error occurred during revision update: " + sw.toString());
+ }
+ }
+
+ /**
+ * Returns the files required for replication. By default, this method returns
+ * all files that exist in the new revision, but not in the handler.
+ */
+ protected Map requiredFiles(Map newRevisionFiles) {
+ Map handlerRevisionFiles = handler.currentRevisionFiles();
+ if (handlerRevisionFiles == null) {
+ return newRevisionFiles;
+ }
+
+ Map requiredFiles = new HashMap();
+ for (Entry e : handlerRevisionFiles.entrySet()) {
+ // put the handler files in a Set, for faster contains() checks later
+ Set handlerFiles = new HashSet();
+ for (RevisionFile file : e.getValue()) {
+ handlerFiles.add(file.fileName);
+ }
+
+ // make sure to preserve revisionFiles order
+ ArrayList res = new ArrayList();
+ String source = e.getKey();
+ assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles;
+ for (RevisionFile file : newRevisionFiles.get(source)) {
+ if (!handlerFiles.contains(file.fileName)) {
+ res.add(file);
+ }
+ }
+ requiredFiles.put(source, res.toArray(new RevisionFile[res.size()]));
+ }
+
+ return requiredFiles;
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (working copy)
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface for replicating files. Allows a producer to
+ * {@link #publish(Revision) publish} {@link Revision}s and consumers to
+ * {@link #checkForUpdate(String) check for updates}. When a client needs to be
+ * updated, it is given a {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. After the client has finished obtaining all the files, it should
+ * {@link #release(String) release} the given session, so that the files can be
+ * reclaimed if they are not needed anymore.
+ *
+ * A client is always updated to the newest revision available. That is, if a
+ * client is on revision r1 and revisions r2 and r3
+ * were published, then when the cllient will next check for update, it will
+ * receive r3.
+ *
+ * @lucene.experimental
+ */
+public interface Replicator extends Closeable {
+
+ /**
+ * Publish a new {@link Revision} for consumption by clients. It is the
+ * caller's responsibility to verify that the revision files exist and can be
+ * read by clients. When the revision is no longer needed, it will be
+ * {@link Revision#release() released} by the replicator.
+ */
+ public void publish(Revision revision) throws IOException;
+
+ /**
+ * Check whether the given version is up-to-date and returns a
+ * {@link SessionToken} which can be used for fetching the revision files,
+ * otherwise returns {@code null}.
+ *
+ * NOTE: when the returned session token is no longer needed, you
+ * should call {@link #release(String)} so that the session resources can be
+ * reclaimed, including the revision files.
+ */
+ public SessionToken checkForUpdate(String currVersion) throws IOException;
+
+ /**
+ * Notify that the specified {@link SessionToken} is no longer needed by the
+ * caller.
+ */
+ public void release(String sessionID) throws IOException;
+
+ /**
+ * Returns an {@link InputStream} for the requested file and source in the
+ * context of the given {@link SessionToken#id session}.
+ *
+ * NOTE: it is the caller's responsibility to close the returned
+ * stream.
+ *
+ * @throws SessionExpiredException if the specified session has already
+ * expired
+ */
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException;
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
___________________________________________________________________
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (working copy)
@@ -0,0 +1,74 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * A revision comprises lists of files that come from different sources and need
+ * to be replicated together to e.g. guarantee that all resources are in sync.
+ * In most cases an application will replicate a single index, and so the
+ * revision will contain files from a single source. However, some applications
+ * may require to treat a collection of indexes as a single entity so that the
+ * files from all sources are replicated together, to guarantee consistency
+ * beween them. For example, an application which indexes facets will need to
+ * replicate both the search and taxonomy indexes together, to guarantee that
+ * they match at the client side.
+ *
+ * @lucene.experimental
+ */
+public interface Revision extends Comparable {
+
+ /**
+ * Compares the revision to the given version string. Behaves like
+ * {@link Comparable#compareTo(Object)}.
+ */
+ public int compareTo(String version);
+
+ /**
+ * Returns a string representation of the version of this revision. The
+ * version is used by {@link #compareTo(String)} as well as to
+ * serialize/deserialize revision information. Therefore it must be self
+ * descriptive as well as be able to identify one revision from another.
+ */
+ public String getVersion();
+
+ /**
+ * Returns the files that comprise this revision, as a mapping from a source
+ * to a list of files.
+ */
+ public Map getSourceFiles();
+
+ /**
+ * Returns an {@link IndexInput} for the given fileName and source. It is the
+ * caller's respnsibility to close the {@link IndexInput} when it has been
+ * consumed.
+ */
+ public InputStream open(String source, String fileName) throws IOException;
+
+ /**
+ * Called when this revision can be safely released, i.e. where there are no
+ * more references to it.
+ */
+ public void release() throws IOException;
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (working copy)
@@ -0,0 +1,59 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Describes a file in a {@link Revision}. A file has a source, which allows a
+ * single revision to contain files from multiple sources (e.g. multiple
+ * indexes).
+ *
+ * @lucene.experimental
+ */
+public class RevisionFile {
+
+ /** The name of the file. */
+ public final String fileName;
+
+ /** The size of the file denoted by {@link #fileName}. */
+ public long size = -1;
+
+ /** Constructor with the given file name. */
+ public RevisionFile(String fileName) {
+ if (fileName == null || fileName.isEmpty()) {
+ throw new IllegalArgumentException("fileName cannot be null or empty");
+ }
+ this.fileName = fileName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ RevisionFile other = (RevisionFile) obj;
+ return fileName.equals(other.fileName) && size == other.size;
+ }
+
+ @Override
+ public int hashCode() {
+ return fileName.hashCode() ^ (int) (size ^ (size >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "fileName=" + fileName + " size=" + size;
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (working copy)
@@ -0,0 +1,54 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a revision update session was expired due to lack
+ * of activity.
+ *
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ * @see LocalReplicator#setExpirationThreshold(long)
+ *
+ * @lucene.experimental
+ */
+public class SessionExpiredException extends IOException {
+
+ /**
+ * @see IOException#IOException(String, Throwable)
+ */
+ public SessionExpiredException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @see IOException#IOException(String)
+ */
+ public SessionExpiredException(String message) {
+ super(message);
+ }
+
+ /**
+ * @see IOException#IOException(Throwable)
+ */
+ public SessionExpiredException(Throwable cause) {
+ super(cause);
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (working copy)
@@ -0,0 +1,111 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Token for a replication session, for guaranteeing that source replicated
+ * files will be kept safe until the replication completes.
+ *
+ * @see Replicator#checkForUpdate(String)
+ * @see Replicator#release(String)
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ *
+ * @lucene.experimental
+ */
+public final class SessionToken {
+
+ /**
+ * ID of this session.
+ * Should be passed when releasing the session, thereby acknowledging the
+ * {@link Replicator Replicator} that this session is no longer in use.
+ * @see Replicator#release(String)
+ */
+ public final String id;
+
+ /**
+ * @see Revision#getVersion()
+ */
+ public final String version;
+
+ /**
+ * @see Revision#getSourceFiles()
+ */
+ public final Map sourceFiles;
+
+ /** Constructor which deserializes from the given {@link DataInput}. */
+ public SessionToken(DataInput in) throws IOException {
+ this.id = in.readUTF();
+ this.version = in.readUTF();
+ this.sourceFiles = new HashMap();
+ int numSources = in.readInt();
+ while (numSources > 0) {
+ String source = in.readUTF();
+ RevisionFile[] files = new RevisionFile[in.readInt()];
+ for (int i = 0; i < files.length; i++) {
+ String fileName = in.readUTF();
+ files[i] = new RevisionFile(fileName);
+ files[i].size = in.readLong();
+ }
+ this.sourceFiles.put(source, files);
+ --numSources;
+ }
+ }
+
+ /** Constructor with the given id and revision. */
+ public SessionToken(String id, Revision revision) {
+ this.id = id;
+ this.version = revision.getVersion();
+ this.sourceFiles = revision.getSourceFiles();
+ }
+
+ /** Serialize the token data for communication between server and client. */
+ public void serialize(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(version);
+ out.writeInt(sourceFiles.size());
+ for (Entry e : sourceFiles.entrySet()) {
+ out.writeUTF(e.getKey());
+ RevisionFile[] files = e.getValue();
+ out.writeInt(files.length);
+ for (RevisionFile file : files) {
+ out.writeUTF(file.fileName);
+ out.writeLong(file.size);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("session=").append(id).append(" version=").append(version).append(" sourceFiles={");
+ for (Entry e : sourceFiles.entrySet()) {
+ sb.append("source=").append(e.getKey()).append(" files=").append(Arrays.toString(e.getValue())).append(';');
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
\ No newline at end of file
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (working copy)
@@ -0,0 +1,297 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.util.EntityUtils;
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * Base class for Http clients.
+ *
+ * @lucene.experimental
+ * */
+public abstract class HttpClientBase implements Closeable {
+
+ /**
+ * Default connection timeout for this client, in milliseconds.
+ *
+ * @see #setConnectionTimeout(int)
+ */
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+
+ /**
+ * Default socket timeout for this client, in milliseconds.
+ *
+ * @see #setSoTimeout(int)
+ */
+ public static final int DEFAULT_SO_TIMEOUT = 60000;
+
+ // TODO compression?
+
+ /** The URL stting to execute requests against. */
+ protected final String url;
+
+ private volatile boolean closed = false;
+
+ private final HttpClient httpc;
+
+ /**
+ * @param conMgr connection manager to use for this http client.
+ * NOTE:The provided {@link ClientConnectionManager} will not be
+ * {@link ClientConnectionManager#shutdown()} by this class.
+ */
+ protected HttpClientBase(String host, int port, String path, ClientConnectionManager conMgr) {
+ url = normalizedURL(host, port, path);
+ httpc = new DefaultHttpClient(conMgr);
+ setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+ setSoTimeout(DEFAULT_SO_TIMEOUT);
+ }
+
+ /**
+ * Set the connection timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setConnectionTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setConnectionTimeout(int timeout) {
+ HttpConnectionParams.setConnectionTimeout(httpc.getParams(), timeout);
+ }
+
+ /**
+ * Set the socket timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setSoTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setSoTimeout(int timeout) {
+ HttpConnectionParams.setSoTimeout(httpc.getParams(), timeout);
+ }
+
+ /** Throws {@link AlreadyClosedException} if this client is already closed. */
+ protected final void ensureOpen() throws AlreadyClosedException {
+ if (closed) {
+ throw new AlreadyClosedException("HttpClient already closed");
+ }
+ }
+
+ /**
+ * Create a URL out of the given parameters, translate an empty/null path to '/'
+ */
+ private static String normalizedURL(String host, int port, String path) {
+ if (path == null || path.length() == 0) {
+ path = "/";
+ }
+ return "http://" + host + ":" + port + path;
+ }
+
+ /**
+ * Internal: response status after invocation, and in case or error attempt to read the
+ * exception sent by the server.
+ */
+ protected void verifyStatus(HttpResponse response) throws IOException {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != HttpStatus.SC_OK) {
+ throwKnownError(response, statusLine);
+ }
+ }
+
+ protected void throwKnownError(HttpResponse response, StatusLine statusLine) throws IOException {
+ ObjectInputStream in = null;
+ try {
+ in = new ObjectInputStream(response.getEntity().getContent());
+ } catch (Exception e) {
+ // the response stream is not an exception - could be an error in servlet.init().
+ throw new RuntimeException("Uknown error: " + statusLine);
+ }
+
+ Throwable t;
+ try {
+ t = (Throwable) in.readObject();
+ } catch (Exception e) {
+ //not likely
+ throw new RuntimeException("Failed to read exception object: " + statusLine, e);
+ } finally {
+ in.close();
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException("unknown exception "+statusLine,t);
+ }
+
+ /**
+ * internal: execute a request and return its result
+ * The params argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executePOST(String request, HttpEntity entity, String... params) throws IOException {
+ ensureOpen();
+ HttpPost m = new HttpPost(queryString(request, params));
+ m.setEntity(entity);
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ /**
+ * internal: execute a request and return its result
+ * The params argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executeGET(String request, String... params) throws IOException {
+ ensureOpen();
+ HttpGet m = new HttpGet(queryString(request, params));
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ private String queryString(String request, String... params) throws UnsupportedEncodingException {
+ StringBuilder query = new StringBuilder(url).append('/').append(request).append('?');
+ if (params != null) {
+ for (int i = 0; i < params.length; i += 2) {
+ query.append(params[i]).append('=').append(URLEncoder.encode(params[i+1], "UTF8")).append('&');
+ }
+ }
+ return query.substring(0, query.length() - 1);
+ }
+
+ /** Internal utility: input stream of the provided response */
+ public InputStream responseInputStream(HttpResponse response) throws IOException {
+ return responseInputStream(response, false);
+ }
+
+ // TODO: can we simplify this Consuming !?!?!?
+ /**
+ * Internal utility: input stream of the provided response, which optionally
+ * consumes the response's resources when the input stream is exhausted.
+ */
+ public InputStream responseInputStream(HttpResponse response, boolean consume) throws IOException {
+ final HttpEntity entity = response.getEntity();
+ final InputStream in = entity.getContent();
+ if (!consume) {
+ return in;
+ }
+ return new InputStream() {
+ private boolean consumed = false;
+ @Override
+ public int read() throws IOException {
+ final int res = in.read();
+ consume(res);
+ return res;
+ }
+ @Override
+ public void close() throws IOException {
+ super.close();
+ consume(-1);
+ }
+ @Override
+ public int read(byte[] b) throws IOException {
+ final int res = super.read(b);
+ consume(res);
+ return res;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int res = super.read(b, off, len);
+ consume(res);
+ return res;
+ }
+ private void consume(int minusOne) {
+ if (!consumed && minusOne==-1) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (Exception e) {
+ // ignored on purpose
+ }
+ consumed = true;
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns true iff this instance was {@link #close() closed}, otherwise
+ * returns false. Note that if you override {@link #close()}, you must call
+ * {@code super.close()}, in order for this instance to be properly closed.
+ */
+ protected final boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Same as {@link #doAction(HttpResponse, boolean, Callable)} but always do consume at the end.
+ */
+ protected T doAction(HttpResponse response, Callable call) throws IOException {
+ return doAction(response, true, call);
+ }
+
+ /**
+ * Do a specific action and validate after the action that the status is still OK,
+ * and if not, attempt to extract the actual server side exception. Optionally
+ * release the response at exit, depending on consume parameter.
+ */
+ protected T doAction(HttpResponse response, boolean consume, Callable call) throws IOException {
+ IOException error = null;
+ try {
+ return call.call();
+ } catch (IOException e) {
+ error = e;
+ } catch (Exception e) {
+ error = new IOException(e);
+ } finally {
+ try {
+ verifyStatus(response);
+ } finally {
+ if (consume) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ } catch (Exception e) {
+ // ignoring on purpose
+ }
+ }
+ }
+ }
+ throw error; // should not get here
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+}
Property changes on: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
___________________________________________________________________
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java
===================================================================
--- lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (revision 0)
+++ lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (working copy)
@@ -0,0 +1,105 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.Revision;
+import org.apache.lucene.replicator.SessionToken;
+import org.apache.lucene.replicator.http.ReplicationService.ReplicationAction;
+
+/**
+ * An HTTP implementation of {@link Replicator}. Assumes the API supported by
+ * {@link ReplicationService}.
+ *
+ * @lucene.experimental
+ */
+public class HttpReplicator extends HttpClientBase implements Replicator {
+
+ /** Construct with specified connection manager. */
+ public HttpReplicator(String host, int port, String path, ClientConnectionManager conMgr) {
+ super(host, port, path, conMgr);
+ }
+
+ @Override
+ public SessionToken checkForUpdate(String currVersion) throws IOException {
+ String[] params = null;
+ if (currVersion != null) {
+ params = new String[] { ReplicationService.REPLICATE_VERSION_PARAM, currVersion };
+ }
+ final HttpResponse response = executeGET(ReplicationAction.UPDATE.name(), params);
+ return doAction(response, new Callable() {
+ @Override
+ public SessionToken call() throws Exception {
+ final DataInputStream dis = new DataInputStream(responseInputStream(response));
+ try {
+ if (dis.readByte() == 0) {
+ return null;
+ } else {
+ return new SessionToken(dis);
+ }
+ } finally {
+ dis.close();
+ }
+ }
+ });
+ }
+
+ @Override
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID,
+ ReplicationService.REPLICATE_SOURCE_PARAM, source,
+ ReplicationService.REPLICATE_FILENAME_PARAM, fileName,
+ };
+ final HttpResponse response = executeGET(ReplicationAction.OBTAIN.name(), params);
+ return doAction(response, false, new Callable() {
+ @Override
+ public InputStream call() throws Exception {
+ return responseInputStream(response,true);
+ }
+ });
+ }
+
+ @Override
+ public void publish(Revision revision) throws IOException {
+ throw new UnsupportedOperationException(
+ "this replicator implementation does not support remote publishing of revisions");
+ }
+
+ @Override
+ public void release(String sessionID) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID
+ };
+ final HttpResponse response = executeGET(ReplicationAction.RELEASE.name(), params);
+ doAction(response, new Callable