Index: lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java =================================================================== --- lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java (revision 1339138) +++ lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java (working copy) @@ -3,11 +3,16 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.facet.taxonomy.CategoryPath; import org.apache.lucene.facet.taxonomy.InconsistentTaxonomyException; import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache; +import org.apache.lucene.facet.taxonomy.writercache.cl2o.Cl2oTaxonomyWriterCache; +import org.apache.lucene.facet.taxonomy.writercache.lru.LruTaxonomyWriterCache; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -42,11 +47,17 @@ NoOpCache() { } + @Override public void close() {} + @Override public int get(CategoryPath categoryPath) { return -1; } + @Override public int get(CategoryPath categoryPath, int length) { return get(categoryPath); } + @Override public boolean put(CategoryPath categoryPath, int ordinal) { return true; } + @Override public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { return true; } + @Override public boolean hasRoom(int numberOfEntries) { return false; } } @@ -201,5 +212,48 @@ dir.close(); } + + public void testConcurrency() throws Exception { + int ncats = atLeast(100000); // add many categories + final int range = ncats * 3; // affects the categories selection + final AtomicInteger numCats = new AtomicInteger(ncats); + Directory dir = newDirectory(); + final ConcurrentHashMap values = new ConcurrentHashMap(); + TaxonomyWriterCache cache = random().nextBoolean() + ? new Cl2oTaxonomyWriterCache(1024, 0.15f, 3) + : new LruTaxonomyWriterCache(ncats / 10); + final DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE, cache); + Thread[] addThreads = new Thread[atLeast(4)]; + for (int z = 0; z < addThreads.length; z++) { + addThreads[z] = new Thread() { + @Override + public void run() { + Random random = random(); + while (numCats.decrementAndGet() > 0) { + try { + int value = random.nextInt(range); + tw.addCategory(new CategoryPath("a", Integer.toString(value))); + values.put(value, value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }; + } + + for (Thread t : addThreads) t.start(); + for (Thread t : addThreads) t.join(); + tw.close(); + + DirectoryTaxonomyReader dtr = new DirectoryTaxonomyReader(dir); + assertEquals(values.size() + 2, dtr.getSize()); // +2 for root category + "a" + for (Integer value : values.keySet()) { + assertTrue("category not found a/" + value, dtr.getOrdinal(new CategoryPath("a", value.toString())) > 0); + } + dtr.close(); + + dir.close(); + } } Index: lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestAddTaxonomy.java =================================================================== --- lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestAddTaxonomy.java (revision 1339138) +++ lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestAddTaxonomy.java (working copy) @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.facet.taxonomy.CategoryPath; import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.DiskOrdinalMap; @@ -32,16 +33,32 @@ public class TestAddTaxonomy extends LuceneTestCase { - private void dotest(int ncats, int range) throws Exception { + private void dotest(int ncats, final int range) throws Exception { + final AtomicInteger numCats = new AtomicInteger(ncats); Directory dirs[] = new Directory[2]; - Random random = random(); for (int i = 0; i < dirs.length; i++) { dirs[i] = newDirectory(); - DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dirs[i]); - for (int j = 0; j < ncats; j++) { - String cat = Integer.toString(random.nextInt(range)); - tw.addCategory(new CategoryPath("a", cat)); + final DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dirs[i]); + Thread[] addThreads = new Thread[4]; + for (int j = 0; j < addThreads.length; j++) { + addThreads[j] = new Thread() { + @Override + public void run() { + Random random = random(); + while (numCats.decrementAndGet() > 0) { + String cat = Integer.toString(random.nextInt(range)); + try { + tw.addCategory(new CategoryPath("a", cat)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }; } + + for (Thread t : addThreads) t.start(); + for (Thread t : addThreads) t.join(); tw.close(); } @@ -133,11 +150,9 @@ } // A more comprehensive and big random test. - @Nightly public void testBig() throws Exception { dotest(200, 10000); dotest(1000, 20000); - // really big dotest(400000, 1000000); } Index: lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java =================================================================== --- lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java (revision 1339138) +++ lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java (working copy) @@ -1,5 +1,8 @@ package org.apache.lucene.facet.taxonomy.writercache.cl2o; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.lucene.facet.taxonomy.CategoryPath; import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache; @@ -30,44 +33,71 @@ */ public class Cl2oTaxonomyWriterCache implements TaxonomyWriterCache { + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private CompactLabelToOrdinal cache; public Cl2oTaxonomyWriterCache(int initialCapcity, float loadFactor, int numHashArrays) { this.cache = new CompactLabelToOrdinal(initialCapcity, loadFactor, numHashArrays); } - public void close() { - cache=null; + @Override + public synchronized void close() { + cache = null; } + @Override public boolean hasRoom(int n) { // This cache is unlimited, so we always have room for remembering more: return true; } + @Override public int get(CategoryPath categoryPath) { - return cache.getOrdinal(categoryPath); + lock.readLock().lock(); + try { + return cache.getOrdinal(categoryPath); + } finally { + lock.readLock().unlock(); + } } + @Override public int get(CategoryPath categoryPath, int length) { - if (length<0 || length>categoryPath.length()) { + if (length < 0 || length > categoryPath.length()) { length = categoryPath.length(); } - return cache.getOrdinal(categoryPath, length); + lock.readLock().lock(); + try { + return cache.getOrdinal(categoryPath, length); + } finally { + lock.readLock().unlock(); + } } + @Override public boolean put(CategoryPath categoryPath, int ordinal) { - cache.addLabel(categoryPath, ordinal); - // Tell the caller we didn't clear part of the cache, so it doesn't - // have to flush its on-disk index now - return false; + lock.writeLock().lock(); + try { + cache.addLabel(categoryPath, ordinal); + // Tell the caller we didn't clear part of the cache, so it doesn't + // have to flush its on-disk index now + return false; + } finally { + lock.writeLock().unlock(); + } } + @Override public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { - cache.addLabel(categoryPath, prefixLen, ordinal); - // Tell the caller we didn't clear part of the cache, so it doesn't - // have to flush its on-disk index now - return false; + lock.writeLock().lock(); + try { + cache.addLabel(categoryPath, prefixLen, ordinal); + // Tell the caller we didn't clear part of the cache, so it doesn't + // have to flush its on-disk index now + return false; + } finally { + lock.writeLock().unlock(); + } } /** @@ -75,8 +105,7 @@ * @return Number of bytes in memory used by this object. */ public int getMemoryUsage() { - int memoryUsage = (this.cache == null) ? 0 : this.cache.getMemoryUsage(); - return memoryUsage; + return cache == null ? 0 : cache.getMemoryUsage(); } } Index: lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java =================================================================== --- lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java (revision 1339138) +++ lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java (working copy) @@ -60,16 +60,19 @@ } } - public boolean hasRoom(int n) { - return n<=(cache.getMaxSize()-cache.getSize()); + @Override + public synchronized boolean hasRoom(int n) { + return n <= (cache.getMaxSize() - cache.getSize()); } - public void close() { + @Override + public synchronized void close() { cache.clear(); - cache=null; + cache = null; } - public int get(CategoryPath categoryPath) { + @Override + public synchronized int get(CategoryPath categoryPath) { Integer res = cache.get(categoryPath); if (res == null) { return -1; @@ -78,7 +81,8 @@ return res.intValue(); } - public int get(CategoryPath categoryPath, int length) { + @Override + public synchronized int get(CategoryPath categoryPath, int length) { if (length<0 || length>categoryPath.length()) { length = categoryPath.length(); } @@ -94,7 +98,8 @@ return res.intValue(); } - public boolean put(CategoryPath categoryPath, int ordinal) { + @Override + public synchronized boolean put(CategoryPath categoryPath, int ordinal) { boolean ret = cache.put(categoryPath, new Integer(ordinal)); // If the cache is full, we need to clear one or more old entries // from the cache. However, if we delete from the cache a recent @@ -109,7 +114,8 @@ return ret; } - public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { + @Override + public synchronized boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { boolean ret = cache.put(categoryPath, prefixLen, new Integer(ordinal)); // If the cache is full, we need to clear one or more old entries // from the cache. However, if we delete from the cache a recent @@ -125,4 +131,3 @@ } } - Index: lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java =================================================================== --- lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java (revision 1339138) +++ lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java (working copy) @@ -27,6 +27,7 @@ import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache; import org.apache.lucene.facet.taxonomy.writercache.cl2o.Cl2oTaxonomyWriterCache; import org.apache.lucene.facet.taxonomy.writercache.lru.LruTaxonomyWriterCache; +import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DocsEnum; @@ -40,7 +41,6 @@ import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.index.TieredMergePolicy; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; @@ -348,18 +348,6 @@ } /** - * Returns the number of memory bytes used by the cache. - * @return Number of cache bytes in memory, for CL2O only; zero otherwise. - */ - public int getCacheMemoryUsage() { - ensureOpen(); - if (this.cache == null || !(this.cache instanceof Cl2oTaxonomyWriterCache)) { - return 0; - } - return ((Cl2oTaxonomyWriterCache)this.cache).getMemoryUsage(); - } - - /** * A hook for extending classes to close additional resources that were used. * The default implementation closes the {@link IndexReader} as well as the * {@link TaxonomyWriterCache} instances that were used.
@@ -413,21 +401,26 @@ reader = openReader(); } - // TODO (Facet): avoid Multi*? - Bits liveDocs = MultiFields.getLiveDocs(reader); - DocsEnum docs = MultiFields.getTermDocsEnum(reader, liveDocs, Consts.FULL, - new BytesRef(categoryPath.toString(delimiter)), - false); - if (docs == null || docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) { - return -1; // category does not exist in taxonomy + int base = 0; + int doc = -1; + for (AtomicReader r : reader.getSequentialSubReaders()) { + DocsEnum docs = r.termDocsEnum(null, Consts.FULL, + new BytesRef(categoryPath.toString(delimiter)), false); + if (docs != null) { + doc = docs.nextDoc() + base; + break; + } + base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc } // Note: we do NOT add to the cache the fact that the category // does not exist. The reason is that our only use for this // method is just before we actually add this category. If // in the future this usage changes, we should consider caching // the fact that the category is not in the taxonomy. - addToCache(categoryPath, docs.docID()); - return docs.docID(); + if (doc > 0) { + addToCache(categoryPath, doc); + } + return doc; } /** @@ -436,7 +429,7 @@ * case the category does not yet exist in the taxonomy. */ private int findCategory(CategoryPath categoryPath, int prefixLen) - throws IOException { + throws IOException { int res = cache.get(categoryPath, prefixLen); if (res >= 0) { return res; @@ -451,36 +444,46 @@ if (reader == null) { reader = openReader(); } - Bits liveDocs = MultiFields.getLiveDocs(reader); - DocsEnum docs = MultiFields.getTermDocsEnum(reader, liveDocs, Consts.FULL, - new BytesRef(categoryPath.toString(delimiter, prefixLen)), - false); - if (docs == null || docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) { - return -1; // category does not exist in taxonomy + + int base = 0; + int doc = -1; + for (AtomicReader r : reader.getSequentialSubReaders()) { + DocsEnum docs = r.termDocsEnum(null, Consts.FULL, + new BytesRef(categoryPath.toString(delimiter, prefixLen)), false); + if (docs != null) { + doc = docs.nextDoc() + base; + break; + } + base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc } - addToCache(categoryPath, prefixLen, docs.docID()); - return docs.docID(); + + if (doc > 0) { + addToCache(categoryPath, prefixLen, doc); + } + return doc; } - // TODO (Facet): addCategory() is synchronized. This means that if indexing is - // multi-threaded, a new category that needs to be written to disk (and - // potentially even trigger a lengthy merge) locks out other addCategory() - // calls - even those which could immediately return a cached value. - // We definitely need to fix this situation! @Override - public synchronized int addCategory(CategoryPath categoryPath) throws IOException { + public int addCategory(CategoryPath categoryPath) throws IOException { ensureOpen(); // If the category is already in the cache and/or the taxonomy, we - // should return its existing ordinal: + // should return its existing ordinal int res = findCategory(categoryPath); if (res < 0) { - // This is a new category, and we need to insert it into the index - // (and the cache). Actually, we might also need to add some of - // the category's ancestors before we can add the category itself - // (while keeping the invariant that a parent is always added to - // the taxonomy before its child). internalAddCategory() does all - // this recursively: - res = internalAddCategory(categoryPath, categoryPath.length()); + // the category is neither in the cache nor in the index - following code + // cannot be executed in parallel. + synchronized (this) { + res = findCategory(categoryPath); + if (res < 0) { + // This is a new category, and we need to insert it into the index + // (and the cache). Actually, we might also need to add some of + // the category's ancestors before we can add the category itself + // (while keeping the invariant that a parent is always added to + // the taxonomy before its child). internalAddCategory() does all + // this recursively + res = internalAddCategory(categoryPath, categoryPath.length()); + } + } } return res; @@ -496,7 +499,7 @@ * recursion. */ private int internalAddCategory(CategoryPath categoryPath, int length) - throws CorruptIndexException, IOException { + throws IOException { // Find our parent's ordinal (recursively adding the parent category // to the taxonomy if it's not already there). Then add the parent @@ -528,13 +531,12 @@ } } - // Note that the methods calling addCategoryDocument() are synchornized, - // so this method is effectively synchronized as well, but we'll add - // synchronized to be on the safe side, and we can reuse class-local objects - // instead of allocating them every time - protected synchronized int addCategoryDocument(CategoryPath categoryPath, - int length, int parent) - throws CorruptIndexException, IOException { + /** + * Note that the methods calling addCategoryDocument() are synchornized, so + * this method is effectively synchronized as well. + */ + private int addCategoryDocument(CategoryPath categoryPath, int length, + int parent) throws IOException { // Before Lucene 2.9, position increments >=0 were supported, so we // added 1 to parent to allow the parent -1 (the parent of the root). // Unfortunately, starting with Lucene 2.9, after LUCENE-1542, this is @@ -544,7 +546,7 @@ // we write here (e.g., to write parent+2), and need to do a workaround // in the reader (which knows that anyway only category 0 has a parent // -1). - parentStream.set(parent+1); + parentStream.set(parent + 1); Document d = new Document(); d.add(parentStreamField); @@ -601,8 +603,7 @@ } } - private void addToCache(CategoryPath categoryPath, int id) - throws CorruptIndexException, IOException { + private void addToCache(CategoryPath categoryPath, int id) throws IOException { if (cache.put(categoryPath, id)) { // If cache.put() returned true, it means the cache was limited in // size, became full, so parts of it had to be cleared. @@ -620,7 +621,7 @@ } private void addToCache(CategoryPath categoryPath, int prefixLen, int id) - throws CorruptIndexException, IOException { + throws IOException { if (cache.put(categoryPath, prefixLen, id)) { refreshReader(); cacheIsComplete = false; @@ -766,7 +767,29 @@ } CategoryPath cp = new CategoryPath(); - Terms terms = MultiFields.getTerms(reader, Consts.FULL); + TermsEnum termsEnum = null; + DocsEnum docsEnum = null; + int base = 0; + for (AtomicReader r : reader.getSequentialSubReaders()) { + Terms terms = r.terms(Consts.FULL); + if (terms != null) { // cannot really happen, but be on the safe side + termsEnum = terms.iterator(termsEnum); + while (termsEnum.next() != null) { + BytesRef t = termsEnum.term(); + // Since we guarantee uniqueness of categories, each term has exactly + // one document. Also, since we do not allow removing categories (and + // hence documents), there are no deletions in the index. Therefore, it + // is sufficient to call next(), and then doc(), exactly once with no + // 'validation' checks. + cp.clear(); + cp.add(t.utf8ToString(), delimiter); + docsEnum = termsEnum.docs(null, docsEnum, false); + cache.put(cp, docsEnum.nextDoc() + base); + } + } + base += r.maxDoc(); // we don't have any deletions, so we're ok + } + /*Terms terms = MultiFields.getTerms(reader, Consts.FULL); // The check is done here to avoid checking it on every iteration of the // below loop. A null term wlil be returned if there are no terms in the // lexicon, or after the Consts.FULL term. However while the loop is @@ -786,11 +809,10 @@ docsEnum = termsEnum.docs(liveDocs, docsEnum, false); docsEnum.nextDoc(); cp.clear(); - // TODO (Facet): avoid String creation/use bytes? cp.add(t.utf8ToString(), delimiter); cache.put(cp, docsEnum.docID()); } - } + }*/ cacheIsComplete = true; // No sense to keep the reader open - we will not need to read from it @@ -832,35 +854,34 @@ */ public void addTaxonomy(Directory taxoDir, OrdinalMap map) throws IOException { ensureOpen(); - IndexReader r = DirectoryReader.open(taxoDir); + DirectoryReader r = DirectoryReader.open(taxoDir); try { final int size = r.numDocs(); final OrdinalMap ordinalMap = map; ordinalMap.setSize(size); CategoryPath cp = new CategoryPath(); - Terms terms = MultiFields.getTerms(r, Consts.FULL); - TermsEnum te = terms.iterator(null); - Bits liveDocs = MultiFields.getLiveDocs(r); + int base = 0; + TermsEnum te = null; DocsEnum docs = null; - // we call next() first, to skip the root category which always exists. - while (te.next() != null) { - String value = te.term().utf8ToString(); - cp.clear(); - cp.add(value, Consts.DEFAULT_DELIMITER); - int ordinal = findCategory(cp); - if (ordinal < 0) { - // NOTE: call addCategory so that it works well in a multi-threaded - // environment, in case e.g. a thread just added the category, after - // the findCategory() call above failed to find it. - ordinal = addCategory(cp); + for (AtomicReader ar : r.getSequentialSubReaders()) { + Terms terms = ar.terms(Consts.FULL); + te = terms.iterator(te); + while (te.next() != null) { + String value = te.term().utf8ToString(); + cp.clear(); + cp.add(value, Consts.DEFAULT_DELIMITER); + int ordinal = findCategory(cp); + if (ordinal < 0) { + // NOTE: call addCategory so that it works well in a multi-threaded + // environment, in case e.g. a thread just added the category, after + // the findCategory() call above failed to find it. + ordinal = addCategory(cp); + } + docs = te.docs(null, docs, false); + ordinalMap.addMapping(docs.nextDoc() + base, ordinal); } - docs = te.docs(liveDocs, docs, false); - ordinalMap.addMapping(docs.nextDoc(), ordinal); + base += ar.maxDoc(); // no deletions, so we're ok } - // we must add the root ordinal map, so that the map will be complete - // (otherwise e.g. DiskOrdinalMap may fail because it expects more - // categories to exist in the file). - ordinalMap.addMapping(0, 0); ordinalMap.addDone(); } finally { r.close(); Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1339138) +++ lucene/CHANGES.txt (working copy) @@ -914,7 +914,11 @@ FST under the hood, which requires less RAM. NormalizeCharMap no longer accepts empty string match (it did previously, but ignored it). (Dawid Weiss, Mike McCandless) - + +* LUCENE-4061: improve synchronization in DirectoryTaxonomyWriter.addCategory + and few general improvements to DirectoryTaxonomyWriter. + (Shai Erera, Gilad Barkai) + Bug fixes * LUCENE-2803: The FieldCache can miss values if an entry for a reader