Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 580648) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -135,16 +135,13 @@ super.deleteFile(name); } - public IndexOutput createOutput(String name) { + public IndexOutput createOutput(String name) throws IOException { if (openFiles == null) { openFiles = new HashMap(); } synchronized(openFiles) { - if (noDeleteOpenFile && openFiles.containsKey(name)) { - // RuntimeException instead of IOException because - // super() does not throw IOException currently: - throw new RuntimeException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite"); - } + if (noDeleteOpenFile && openFiles.containsKey(name)) + throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite"); } RAMFile file = new RAMFile(this); synchronized (this) { Index: src/test/org/apache/lucene/index/TestFieldInfos.java =================================================================== --- src/test/org/apache/lucene/index/TestFieldInfos.java (revision 580648) +++ src/test/org/apache/lucene/index/TestFieldInfos.java (working copy) @@ -41,7 +41,7 @@ protected void tearDown() { } - public void test() { + public void test() throws IOException { //Positive test of FieldInfos assertTrue(testDoc != null); FieldInfos fieldInfos = new FieldInfos(); Index: src/test/org/apache/lucene/index/TestMultiSegmentReader.java =================================================================== --- src/test/org/apache/lucene/index/TestMultiSegmentReader.java (revision 580648) +++ src/test/org/apache/lucene/index/TestMultiSegmentReader.java (working copy) @@ -41,14 +41,14 @@ protected void setUp() throws IOException { dir = new RAMDirectory(); - sis = new SegmentInfos(); doc1 = new Document(); doc2 = new Document(); DocHelper.setupDoc(doc1); DocHelper.setupDoc(doc2); SegmentInfo info1 = DocHelper.writeDoc(dir, doc1); SegmentInfo info2 = DocHelper.writeDoc(dir, doc2); - sis.write(dir); + sis = new SegmentInfos(); + sis.read(dir); } protected IndexReader openReader() throws IOException { @@ -97,6 +97,12 @@ // Ensure undeleteAll survives commit/close/reopen: reader.commit(); reader.close(); + + if (reader instanceof MultiReader) + // MultiReader does not "own" the directory so it does + // not write the changes to sis on commit: + sis.write(dir); + sis.read(dir); reader = openReader(); assertEquals( 2, reader.numDocs() ); @@ -105,6 +111,10 @@ assertEquals( 1, reader.numDocs() ); reader.commit(); reader.close(); + if (reader instanceof MultiReader) + // MultiReader does not "own" the directory so it does + // not write the changes to sis on commit: + sis.write(dir); sis.read(dir); reader = openReader(); assertEquals( 1, reader.numDocs() ); Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 580648) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -220,6 +220,7 @@ String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", gen); + dir.deleteFile(IndexFileNames.SEGMENTS_GEN); while(gen > 0) { try { IndexReader reader = IndexReader.open(dir); @@ -279,6 +280,7 @@ // Simplistic check: just verify all segments_N's still // exist, and, I can open a reader on each: + dir.deleteFile(IndexFileNames.SEGMENTS_GEN); long gen = SegmentInfos.getCurrentSegmentGeneration(dir); while(gen > 0) { IndexReader reader = IndexReader.open(dir); @@ -386,6 +388,7 @@ // Simplistic check: just verify only the past N segments_N's still // exist, and, I can open a reader on each: + dir.deleteFile(IndexFileNames.SEGMENTS_GEN); long gen = SegmentInfos.getCurrentSegmentGeneration(dir); for(int i=0;i gen) { - message("fallback to '" + IndexFileNames.SEGMENTS_GEN + "' check: now try generation " + gen0 + " > " + gen); - gen = gen0; - } + genB = gen0; break; } } @@ -567,15 +564,35 @@ // will retry } } + + message(IndexFileNames.SEGMENTS_GEN + " check: genB=" + genB); + + // Pick the larger of the two gen's: + if (genA > genB) + gen = genA; + else + gen = genB; + + if (gen == -1) { + // Neither approach found a generation + String s; + if (files != null) { + s = ""; + for(int i=0;i 0.0 MB"); docWriter.setRAMBufferSizeMB(mb); + if (infoStream != null) + message("setRAMBufferSizeMB " + mb); } /** @@ -904,6 +933,8 @@ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { ensureOpen(); docWriter.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms); + if (infoStream != null) + message("setMaxBufferedDeleteTerms " + maxBufferedDeleteTerms); } /** @@ -975,10 +1006,25 @@ public void setInfoStream(PrintStream infoStream) { ensureOpen(); this.infoStream = infoStream; + setMessageID(); docWriter.setInfoStream(infoStream); deleter.setInfoStream(infoStream); + if (infoStream != null) + messageState(); } + private void messageState() { + message("setInfoStream: dir=" + directory + + " autoCommit=" + autoCommit + + " mergePolicy=" + mergePolicy + + " mergeScheduler=" + mergeScheduler + + " ramBufferSizeMB=" + docWriter.getRAMBufferSizeMB() + + " maxBuffereDocs=" + docWriter.getMaxBufferedDocs() + + " maxBuffereDeleteTerms=" + docWriter.getMaxBufferedDeleteTerms() + + " maxFieldLength=" + maxFieldLength + + " index=" + segString()); + } + /** * Returns the current infoStream in use by this writer. * @see #setInfoStream @@ -1100,6 +1146,8 @@ private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException { try { + if (infoStream != null) + message("now flush at close"); flush(true, true); @@ -1578,11 +1626,12 @@ * background threads. */ public void optimize(boolean doWait) throws CorruptIndexException, IOException { ensureOpen(); - flush(); if (infoStream != null) message("optimize: index now " + segString()); + flush(); + synchronized(this) { resetMergeExceptions(); segmentsToOptimize = new HashSet(); @@ -1735,6 +1784,10 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; if (localAutoCommit) { + + if (infoStream != null) + message("flush at startTransaction"); + flush(); // Turn off auto-commit during our local transaction: autoCommit = false; @@ -1967,6 +2020,8 @@ throws CorruptIndexException, IOException { ensureOpen(); + if (infoStream != null) + message("flush at addIndexes"); flush(); int start = segmentInfos.size(); @@ -2022,6 +2077,8 @@ throws CorruptIndexException, IOException { ensureOpen(); + if (infoStream != null) + message("flush at addIndexesNoOptimize"); flush(); /* new merge policy @@ -2813,6 +2870,8 @@ // TODO: if we know we are about to merge away these // newly flushed doc store files then we should not // make compound file out of them... + if (infoStream != null) + message("flush at merge"); flush(false, true); } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 580648) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -102,10 +102,12 @@ void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; + if (infoStream != null) + message("setInfoStream deletionPolicy=" + policy); } private void message(String message) { - infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message); + infoStream.println("IFD [" + Thread.currentThread().getName() + "]: " + message); } /** @@ -125,7 +127,7 @@ this.infoStream = infoStream; if (infoStream != null) - message("init: current segments file is \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + message("init: current segments file is \"" + segmentInfos.getCurrentSegmentFileName() + "\"; deletionPolicy=" + policy); this.policy = policy; this.directory = directory; @@ -189,7 +191,24 @@ } if (currentCommitPoint == null) { - throw new CorruptIndexException("failed to locate current segments_N file"); + // We did not in fact see the segments_N file + // corresponding to the segmentInfos that was passed + // in. Yet, it must exist, because our caller holds + // the write lock. This can happen when the directory + // listing was stale (eg when index accessed via NFS + // client with stale directory listing cache). So we + // try now to explicitly open this commit point: + SegmentInfos sis = new SegmentInfos(); + try { + sis.read(directory, segmentInfos.getCurrentSegmentFileName()); + } catch (IOException e) { + throw new CorruptIndexException("failed to locate current segments_N file"); + } + if (infoStream != null) + message("forced open of current segments file " + segmentInfos.getCurrentSegmentFileName()); + currentCommitPoint = new CommitPoint(sis); + commits.add(currentCommitPoint); + incRef(sis, true); } // We keep commits list in sorted order (oldest to newest): Index: src/java/org/apache/lucene/store/VerifyingLockFactory.java =================================================================== --- src/java/org/apache/lucene/store/VerifyingLockFactory.java (revision 0) +++ src/java/org/apache/lucene/store/VerifyingLockFactory.java (revision 0) @@ -0,0 +1,106 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.Socket; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A LockFactory that wraps another LockFactory and verifies + * that each lock obtain/release is "correct". It does this + * by contacting an external server to assert that at most + * one process holds the lock at a time. To use this, you + * should also run VerifyingLockServer on the host & port + * matching what you init this LockFactory with. + */ + +public class VerifyingLockFactory extends LockFactory { + + LockFactory lf; + byte id; + String host; + int port; + + private class CheckedLock extends Lock { + private Lock lock; + + public CheckedLock(Lock lock) { + this.lock = lock; + } + + private void verify(byte message) { + try { + Socket s = new Socket(host, port); + OutputStream out = s.getOutputStream(); + out.write(id); + out.write(message); + InputStream in = s.getInputStream(); + int result = in.read(); + in.close(); + out.close(); + s.close(); + if (result != 0) + throw new RuntimeException("lock was double acquired"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public synchronized boolean obtain(long lockWaitTimeout) + throws LockObtainFailedException, IOException { + boolean obtained = lock.obtain(lockWaitTimeout); + if (obtained) + verify((byte) 1); + return obtained; + } + + public synchronized boolean obtain() + throws LockObtainFailedException, IOException { + return lock.obtain(); + } + + public synchronized boolean isLocked() { + return lock.isLocked(); + } + + public synchronized void release() { + if (isLocked()) { + verify((byte) 0); + lock.release(); + } + } + } + + public VerifyingLockFactory(byte id, LockFactory lf, String host, int port) throws IOException { + this.id = id; + this.lf = lf; + this.host = host; + this.port = port; + } + + public synchronized Lock makeLock(String lockName) { + return new CheckedLock(lf.makeLock(lockName)); + } + + public synchronized void clearLock(String lockName) + throws IOException { + lf.clearLock(lockName); + } +} Property changes on: src/java/org/apache/lucene/store/VerifyingLockFactory.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/store/RAMDirectory.java =================================================================== --- src/java/org/apache/lucene/store/RAMDirectory.java (revision 580648) +++ src/java/org/apache/lucene/store/RAMDirectory.java (working copy) @@ -209,7 +209,7 @@ } /** Creates a new, empty file in the directory with the given name. Returns a stream writing this file. */ - public IndexOutput createOutput(String name) { + public IndexOutput createOutput(String name) throws IOException { ensureOpen(); RAMFile file = new RAMFile(this); synchronized (this) { Index: src/java/org/apache/lucene/store/LockVerifyServer.java =================================================================== --- src/java/org/apache/lucene/store/LockVerifyServer.java (revision 0) +++ src/java/org/apache/lucene/store/LockVerifyServer.java (revision 0) @@ -0,0 +1,96 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.ServerSocket; +import java.net.Socket; +import java.io.OutputStream; +import java.io.InputStream; + +/** + * Simple standalone server that must be running when you + * use CheckedLockFactory. This server is the "central + * authority" that verifies that at most one process holds + * the lock at once. + */ + +import java.io.IOException; + +public class LockVerifyServer { + + private static String getTime(long startTime) { + return "[" + ((System.currentTimeMillis()-startTime)/1000) + "s] "; + } + + public static void main(String[] args) throws IOException { + + if (args.length != 1) { + System.out.println("\nUsage: java org.apache.lucene.store.LockVerifyServer port\n"); + System.exit(1); + } + + final int port = Integer.parseInt(args[0]); + + ServerSocket s = new ServerSocket(port); + s.setReuseAddress(true); + System.out.println("\nReady on port " + port + "..."); + + int lockedID = 0; + long startTime = System.currentTimeMillis(); + + char[] message = new char[3]; + + while(true) { + Socket cs = s.accept(); + OutputStream out = cs.getOutputStream(); + InputStream in = cs.getInputStream(); + + int id = in.read(); + int command = in.read(); + + boolean err = false; + + if (command == 1) { + // Locked + if (lockedID != 0) { + err = true; + System.out.println(getTime(startTime) + " ERROR: id " + id + " got lock, but " + lockedID + " already holds the lock"); + } + lockedID = id; + } else if (command == 0) { + if (lockedID != id) { + err = true; + System.out.println(getTime(startTime) + " ERROR: id " + id + " released the lock, but " + lockedID + " is the one holding the lock"); + } + lockedID = 0; + } else + throw new RuntimeException("unrecognized command " + command); + + System.out.print("."); + + if (err) + out.write(1); + else + out.write(0); + + out.close(); + in.close(); + cs.close(); + } + } +} Property changes on: src/java/org/apache/lucene/store/LockVerifyServer.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/store/NativeFSLockFactory.java =================================================================== --- src/java/org/apache/lucene/store/NativeFSLockFactory.java (revision 580648) +++ src/java/org/apache/lucene/store/NativeFSLockFactory.java (working copy) @@ -317,7 +317,7 @@ } } - public boolean isLocked() { + public synchronized boolean isLocked() { return lock != null; } Index: src/java/org/apache/lucene/store/LockStressTest.java =================================================================== --- src/java/org/apache/lucene/store/LockStressTest.java (revision 0) +++ src/java/org/apache/lucene/store/LockStressTest.java (revision 0) @@ -0,0 +1,114 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.File; + +/** + * Simple standalone tool that forever acquires & releases a + * lock using a specific LockFactory. Run without any args + * to see usage. + */ + +public class LockStressTest { + + public static void main(String[] args) throws Exception { + + if (args.length != 6) { + System.out.println("\nUsage: java org.apache.lucene.store.LockStressTest myID verifierHostOrIP verifierPort lockFactoryClassName lockDirName sleepTime\n" + + "\n" + + " myID = int from 0 .. 255 (should be unique for test process)\n" + + " verifierHostOrIP = host name or IP address where LockVerifyServer is running\n" + + " verifierPort = port that LockVerifyServer is listening on\n" + + " lockFactoryClassName = primary LockFactory class that we will use\n" + + " lockDirName = path to the lock directory (only set for Simple/NativeFSLockFactory\n" + + " sleepTimeMS = milliseconds to pause betweeen each lock obtain/release\n" + + "\n" + + "You should run multiple instances of this process, each with its own\n" + + "unique ID, and each pointing to the same lock directory, to verify\n" + + "that locking is working correctly.\n" + + "\n" + + "Make sure you are first running LockVerifyServer.\n" + + "\n"); + System.exit(1); + } + + final int myID = Integer.parseInt(args[0]); + + if (myID < 0 || myID > 255) { + System.out.println("myID must be a unique int 0..255"); + System.exit(1); + } + + final String verifierHost = args[1]; + final int verifierPort = Integer.parseInt(args[2]); + final String lockFactoryClassName = args[3]; + final String lockDirName = args[4]; + final int sleepTimeMS = Integer.parseInt(args[5]); + + Class c; + try { + c = Class.forName(lockFactoryClassName); + } catch (ClassNotFoundException e) { + throw new IOException("unable to find LockClass " + lockFactoryClassName); + } + + LockFactory lockFactory; + try { + lockFactory = (LockFactory) c.newInstance(); + } catch (IllegalAccessException e) { + throw new IOException("IllegalAccessException when instantiating LockClass " + lockFactoryClassName); + } catch (InstantiationException e) { + throw new IOException("InstantiationException when instantiating LockClass " + lockFactoryClassName); + } catch (ClassCastException e) { + throw new IOException("unable to cast LockClass " + lockFactoryClassName + " instance to a LockFactory"); + } + + File lockDir = new File(lockDirName); + + if (lockFactory instanceof NativeFSLockFactory) { + ((NativeFSLockFactory) lockFactory).setLockDir(lockDir); + } else if (lockFactory instanceof SimpleFSLockFactory) { + ((SimpleFSLockFactory) lockFactory).setLockDir(lockDir); + } + + lockFactory.setLockPrefix("test"); + + LockFactory verifyLF = new VerifyingLockFactory((byte) myID, lockFactory, verifierHost, verifierPort); + + Lock l = verifyLF.makeLock("test.lock"); + + while(true) { + + boolean obtained = false; + + try { + obtained = l.obtain(10); + } catch (LockObtainFailedException e) { + System.out.print("x"); + } + + if (obtained) { + System.out.print("l"); + l.release(); + } + Thread.sleep(sleepTimeMS); + } + } +} Property changes on: src/java/org/apache/lucene/store/LockStressTest.java ___________________________________________________________________ Name: svn:eol-style + native