Index: build.xml =================================================================== --- build.xml (revision 681741) +++ build.xml (working copy) @@ -177,6 +177,8 @@ + + Index: src/java/org/apache/nutch/crawl/Crawl.java =================================================================== --- src/java/org/apache/nutch/crawl/Crawl.java (revision 681741) +++ src/java/org/apache/nutch/crawl/Crawl.java (working copy) @@ -49,7 +49,7 @@ public static void main(String args[]) throws Exception { if (args.length < 1) { System.out.println - ("Usage: Crawl [-dir d] [-threads n] [-depth i] [-topN N]"); + ("Usage: Crawl [-dir d] [-threads n] [-depth i] [-topN N] [-solr url]"); return; } @@ -62,7 +62,7 @@ int threads = job.getInt("fetcher.threads.fetch", 10); int depth = 5; long topN = Long.MAX_VALUE; - + String solrUrl = null; for (int i = 0; i < args.length; i++) { if ("-dir".equals(args[i])) { dir = new Path(args[i+1]); @@ -74,8 +74,11 @@ depth = Integer.parseInt(args[i+1]); i++; } else if ("-topN".equals(args[i])) { - topN = Integer.parseInt(args[i+1]); - i++; + topN = Integer.parseInt(args[i+1]); + i++; + } else if ("-solr".equals(args[i])) { + solrUrl = args[i+1]; + i++; } else if (args[i] != null) { rootUrlDir = new Path(args[i]); } @@ -95,7 +98,7 @@ Path crawlDb = new Path(dir + "/crawldb"); Path linkDb = new Path(dir + "/linkdb"); Path segments = new Path(dir + "/segments"); - Path indexes = new Path(dir + "/indexes"); + Path indexes = (solrUrl == null ) ? new Path(dir + "/indexes") : null; Path index = new Path(dir + "/index"); Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate()); @@ -128,16 +131,18 @@ if (i > 0) { linkDbTool.invert(linkDb, segments, true, true, false); // invert links - // Delete old indexes - if (fs.exists(indexes)) { - LOG.info("Deleting old indexes: " + indexes); - fs.delete(indexes, true); - } + if(indexes != null) { + // Delete old indexes + if (fs.exists(indexes)) { + LOG.info("Deleting old indexes: " + indexes); + fs.delete(indexes, true); + } - // Delete old index - if (fs.exists(index)) { - LOG.info("Deleting old merged index: " + index); - fs.delete(index, true); + // Delete old index + if (fs.exists(index)) { + LOG.info("Deleting old merged index: " + index); + fs.delete(index, true); + } } // index, dedup & merge @@ -142,10 +147,12 @@ // index, dedup & merge FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs)); - indexer.index(indexes, crawlDb, linkDb, HadoopFSUtil.getPaths(fstats)); - dedup.dedup(new Path[] { indexes }); - fstats = fs.listStatus(indexes, HadoopFSUtil.getPassDirectoriesFilter(fs)); - merger.merge(HadoopFSUtil.getPaths(fstats), index, tmpDir); + indexer.index(indexes, solrUrl, crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats))); + if(indexes != null) { + dedup.dedup(new Path[] { indexes }); + fstats = fs.listStatus(indexes, HadoopFSUtil.getPassDirectoriesFilter(fs)); + merger.merge(HadoopFSUtil.getPaths(fstats), index, tmpDir); + } } else { LOG.warn("No URLs to fetch - check your seed list and URL filters."); } Index: src/java/org/apache/nutch/crawl/Inlinks.java =================================================================== --- src/java/org/apache/nutch/crawl/Inlinks.java (revision 681741) +++ src/java/org/apache/nutch/crawl/Inlinks.java (working copy) @@ -69,7 +69,7 @@ /** Return the set of anchor texts. Only a single anchor with a given text * is permitted from a given domain. */ - public String[] getAnchors() throws IOException { + public String[] getAnchors() { HashMap> domainToAnchors = new HashMap>(); ArrayList results = new ArrayList(); @@ -97,5 +97,4 @@ return results.toArray(new String[results.size()]); } - } Index: src/java/org/apache/nutch/indexer/Indexer.java =================================================================== --- src/java/org/apache/nutch/indexer/Indexer.java (revision 681741) +++ src/java/org/apache/nutch/indexer/Indexer.java (working copy) @@ -17,26 +17,35 @@ package org.apache.nutch.indexer; -import java.io.*; -import java.util.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.nutch.parse.*; -import org.apache.nutch.analysis.*; - -import org.apache.nutch.scoring.ScoringFilterException; -import org.apache.nutch.scoring.ScoringFilters; -import org.apache.nutch.util.LogUtil; -import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.NutchJob; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.Inlinks; @@ -42,11 +51,20 @@ import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.crawl.LinkDb; import org.apache.nutch.crawl.NutchWritable; - -import org.apache.lucene.index.*; -import org.apache.lucene.document.*; +import org.apache.nutch.indexer.lucene.LuceneConstants; +import org.apache.nutch.indexer.lucene.LuceneWriter; +import org.apache.nutch.indexer.solr.SolrConstants; +import org.apache.nutch.indexer.solr.SolrWriter; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; /** Create indexes for segments. */ public class Indexer extends Configured implements Tool, Reducer, Mapper { @@ -55,101 +73,77 @@ public static final Log LOG = LogFactory.getLog(Indexer.class); - /** A utility class used to pass a lucene document from Indexer.reduce - * to Indexer.OutputFormat. - * Note: Despite its name, it can't properly wrap a lucene document - it - * doesn't know how to serialize/deserialize a lucene document. - */ - private static class LuceneDocumentWrapper implements Writable { - private Document doc; - - public LuceneDocumentWrapper(Document doc) { - this.doc = doc; - } - - public Document get() { - return doc; - } - - public void readFields(DataInput in) throws IOException { - // intentionally left blank - } - - public void write(DataOutput out) throws IOException { - // intentionally left blank - } - - } - + private static final String LUCENE_ENABLED_KEY = + "indexer.lucene.backend.enabled"; + + private static final String SOLR_ENABLED_KEY = + "indexer.solr.backend.enabled"; + /** Unwrap Lucene Documents created by reduce and add them to an index. */ public static class OutputFormat - extends org.apache.hadoop.mapred.FileOutputFormat { - public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, + extends org.apache.hadoop.mapred.FileOutputFormat { + public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, String name, final Progressable progress) throws IOException { - final Path perm = new Path(FileOutputFormat.getOutputPath(job), name); - final Path temp = - job.getLocalPath("index/_"+Integer.toString(new Random().nextInt())); - - fs.delete(perm, true); // delete old, if any - - final AnalyzerFactory factory = new AnalyzerFactory(job); - final IndexWriter writer = // build locally first - new IndexWriter(fs.startLocalOutput(perm, temp).toString(), - new NutchDocumentAnalyzer(job), true); - - writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10)); - writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100)); - writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE)); - writer.setTermIndexInterval - (job.getInt("indexer.termIndexInterval", 128)); - writer.setMaxFieldLength(job.getInt("indexer.max.tokens", 10000)); - writer.setInfoStream(LogUtil.getInfoStream(LOG)); - writer.setUseCompoundFile(false); - writer.setSimilarity(new NutchSimilarity()); - - return new RecordWriter() { - boolean closed; - - public void write(WritableComparable key, LuceneDocumentWrapper value) - throws IOException { // unwrap & index doc - Document doc = value.get(); - NutchAnalyzer analyzer = factory.get(doc.get("lang")); - if (LOG.isInfoEnabled()) { - LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" + - " with analyzer " + analyzer + - " (" + doc.get("lang") + ")"); + + final List writers = new ArrayList(); + + if (job.getBoolean(LUCENE_ENABLED_KEY, false)) { + LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + + job.set(LuceneConstants.OUTPUT_DIR, + new Path(FileOutputFormat.getOutputPath(job), name).toString()); + Path temp = job.getLocalPath("index/_" + + Integer.toString(new Random().nextInt())); + job.set(LuceneConstants.TEMP_OUTPUT_DIR, temp.toString()); + + writers.add(new LuceneWriter()); + } + + if (job.getBoolean(SOLR_ENABLED_KEY, false)) { + writers.add(new SolrWriter()); + } + + for (NutchIndexWriter writer : writers) { + writer.open(job); + } + + return new RecordWriter() { + boolean closed; + + public void write(Text key, NutchDocument doc) + throws IOException { + for (NutchIndexWriter writer : writers) { + writer.write(doc); + } + progress.progress(); + } + + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { continue; } + catch (Throwable e) { return; } + } } - writer.addDocument(doc, analyzer); - progress.progress(); - } - - public void close(final Reporter reporter) throws IOException { - // spawn a thread to give progress heartbeats - Thread prog = new Thread() { - public void run() { - while (!closed) { - try { - reporter.setStatus("closing"); - Thread.sleep(1000); - } catch (InterruptedException e) { continue; } - catch (Throwable e) { return; } - } - } - }; + }; - try { - prog.start(); - if (LOG.isInfoEnabled()) { LOG.info("Optimizing index."); } - // optimize & close index - writer.optimize(); - writer.close(); - fs.completeLocalOutput(perm, temp); // copy to dfs - fs.createNewFile(new Path(perm, DONE_NAME)); - } finally { - closed = true; - } - } - }; + try { + prog.start(); + for (NutchIndexWriter writer : writers) { + writer.close(); + } + } finally { + closed = true; + } + } + }; } } @@ -217,25 +211,14 @@ return; } - Document doc = new Document(); + NutchDocument doc = new NutchDocument(); Metadata metadata = parseData.getContentMeta(); // add segment, used to map from merged index back to segment files - doc.add(new Field("segment", metadata.get(Nutch.SEGMENT_NAME_KEY), - Field.Store.YES, Field.Index.NO)); + doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY)); // add digest, used by dedup - doc.add(new Field("digest", metadata.get(Nutch.SIGNATURE_KEY), - Field.Store.YES, Field.Index.NO)); - -// if (LOG.isInfoEnabled()) { -// LOG.info("Url: "+key.toString()); -// LOG.info("Title: "+parseData.getTitle()); -// LOG.info(crawlDatum.toString()); -// if (inlinks != null) { -// LOG.info(inlinks.toString()); -// } -// } + doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); Parse parse = new ParseImpl(parseText, parseData); try { @@ -267,15 +250,14 @@ return; } // apply boost to all indexed fields. - doc.setBoost(boost); + doc.setScore(boost); // store boost for use by explain and dedup - doc.add(new Field("boost", Float.toString(boost), - Field.Store.YES, Field.Index.NO)); + doc.add("boost", Float.toString(boost)); - output.collect(key, new LuceneDocumentWrapper(doc)); + output.collect(key, doc); } - public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments) + public void index(Path luceneDir, String solrUrl, Path crawlDb, Path linkDb, Collection segments) throws IOException { if (LOG.isInfoEnabled()) { @@ -280,6 +262,7 @@ if (LOG.isInfoEnabled()) { LOG.info("Indexer: starting"); + LOG.info("Indexer: crawldb: " + crawlDb); LOG.info("Indexer: linkdb: " + linkDb); } @@ -284,16 +267,40 @@ } JobConf job = new NutchJob(getConf()); - job.setJobName("index " + indexDir); - - for (int i = 0; i < segments.length; i++) { + String jobName = "index"; + if (luceneDir != null) { + // add lucene output dir to job name + jobName += " lucene=" + luceneDir; + job.setBoolean(LUCENE_ENABLED_KEY, true); + if (LOG.isInfoEnabled()) { + LOG.info("Indexer: luceneDir: " + luceneDir); + } + } else { + job.setBoolean(LUCENE_ENABLED_KEY, false); + } + + if (solrUrl != null) { + // add solr server url to job name + jobName += " solr=" + solrUrl; + job.setBoolean(SOLR_ENABLED_KEY, true); + job.set(SolrConstants.SERVER_URL, solrUrl); + if (LOG.isInfoEnabled()) { + LOG.info("Indexer: solrUrl: " + solrUrl); + } + } else { + job.setBoolean(SOLR_ENABLED_KEY, false); + } + + job.setJobName(jobName); + + for (Path segment : segments) { if (LOG.isInfoEnabled()) { - LOG.info("Indexer: adding segment: " + segments[i]); + LOG.info("Indexer: adding segment: " + segment); } - FileInputFormat.addInputPath(job, new Path(segments[i], CrawlDatum.FETCH_DIR_NAME)); - FileInputFormat.addInputPath(job, new Path(segments[i], CrawlDatum.PARSE_DIR_NAME)); - FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME)); - FileInputFormat.addInputPath(job, new Path(segments[i], ParseText.DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); } FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); @@ -303,13 +310,28 @@ job.setMapperClass(Indexer.class); job.setReducerClass(Indexer.class); - FileOutputFormat.setOutputPath(job, indexDir); + Path tempDir = new Path("indexer-" + + new Random().nextInt(Integer.MAX_VALUE)); + if (luceneDir == null) { + FileOutputFormat.setOutputPath(job, tempDir); + } else { + FileOutputFormat.setOutputPath(job, luceneDir); + } job.setOutputFormat(OutputFormat.class); job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); job.setOutputValueClass(NutchWritable.class); - JobClient.runJob(job); - if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); } + try { + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); } + } catch(IOException e) { + throw e; + } finally { + // clean up + FileSystem fs = FileSystem.get(getConf()); + fs.delete(tempDir, true); + } } public static void main(String[] args) throws Exception { @@ -319,19 +341,45 @@ public int run(String[] args) throws Exception { - if (args.length < 4) { - System.err.println("Usage: ..."); + if (args.length < 5) { + System.err.println("Usage: (-lucene ) (-solr )" + + " ..."); + return -1; } - Path[] segments = new Path[args.length-3]; - for (int i = 3; i < args.length; i++) { - segments[i-3] = new Path(args[i]); + Path luceneDir = null; + String solrUrl = null; + Path crawlDb = null; + Path linkDb = null; + ArrayList segments = new ArrayList(); + int i; + for (i = 0; i < args.length; i++) { + if (args[i].equals("-lucene")) { + luceneDir = new Path(args[++i]); + } else if (args[i].equals("-solr")) { + solrUrl = args[++i]; + } else { + break; + } + } + + if (luceneDir == null && solrUrl == null) { + System.err.println("Usage: (-lucene ) (-solr )" + + " ..."); + + return -1; + } + + crawlDb = new Path(args[i++]); + linkDb = new Path(args[i++]); + + for (; i < args.length; i++) { + segments.add(new Path(args[i])); } try { - index(new Path(args[0]), new Path(args[1]), new Path(args[2]), - segments); + index(luceneDir, solrUrl, crawlDb, linkDb, segments); return 0; } catch (Exception e) { LOG.fatal("Indexer: " + StringUtils.stringifyException(e)); Index: src/java/org/apache/nutch/indexer/IndexingFilter.java =================================================================== --- src/java/org/apache/nutch/indexer/IndexingFilter.java (revision 681741) +++ src/java/org/apache/nutch/indexer/IndexingFilter.java (working copy) @@ -17,11 +17,9 @@ package org.apache.nutch.indexer; -// Lucene imports -import org.apache.lucene.document.Document; - // Hadoop imports import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; // Nutch imports @@ -52,6 +50,15 @@ * should be discarded) * @throws IndexingException */ - Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) + NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException; + + /** Adds index-level configuraition options. + * Implementations can update given configuration to pass document-independent + * information to indexing backends. As a rule of thumb, prefix meta keys + * with the name of the backend intended. For example, when + * passing information to lucene backend, prefix keys with "lucene.". + * @param conf Configuration instance. + * */ + public void addIndexBackendOptions(Configuration conf); } Index: src/java/org/apache/nutch/indexer/IndexingFilters.java =================================================================== --- src/java/org/apache/nutch/indexer/IndexingFilters.java (revision 681741) +++ src/java/org/apache/nutch/indexer/IndexingFilters.java (working copy) @@ -24,8 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.lucene.document.Document; - import org.apache.nutch.plugin.*; import org.apache.nutch.parse.Parse; import org.apache.nutch.util.ObjectCache; @@ -72,6 +70,7 @@ .getExtensionInstance(); LOG.info("Adding " + filter.getClass().getName()); if (!filterMap.containsKey(filter.getClass().getName())) { + filter.addIndexBackendOptions(conf); filterMap.put(filter.getClass().getName(), filter); } } @@ -90,6 +89,7 @@ IndexingFilter filter = filterMap .get(orderedFilters[i]); if (filter != null) { + filter.addIndexBackendOptions(conf); filters.add(filter); } } @@ -105,7 +105,7 @@ } /** Run all defined filters. */ - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { for (int i = 0; i < this.indexingFilters.length; i++) { doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks); @@ -115,4 +115,5 @@ return doc; } + } Index: src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java =================================================================== --- src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java (revision 0) +++ src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java (revision 0) @@ -0,0 +1,42 @@ +package org.apache.nutch.indexer.lucene; + +public interface LuceneConstants { + public static final String LUCENE_PREFIX = "lucene."; + + public static final String OUTPUT_DIR = LUCENE_PREFIX + "output.dir"; + + public static final String TEMP_OUTPUT_DIR = LUCENE_PREFIX + "tmp.dir"; + + static final String FIELD_PREFIX = LUCENE_PREFIX + "field."; + + static final String FIELD_STORE_PREFIX = FIELD_PREFIX + "store."; + + static final String FIELD_INDEX_PREFIX = FIELD_PREFIX + "index."; + + static final String FIELD_VECTOR_PREFIX = FIELD_PREFIX + "vector."; + + static final String STORE_YES = "store.yes"; + + static final String STORE_NO = "store.no"; + + static final String STORE_COMPRESS = "store.compress"; + + static final String INDEX_NO = "index.no"; + + static final String INDEX_NO_NORMS = "index.no_norms"; + + static final String INDEX_TOKENIZED = "index.tokenized"; + + static final String INDEX_UNTOKENIZED = "index.untokenized"; + + static final String VECTOR_NO = "vector.no"; + + static final String VECTOR_POS = "vector.pos"; + + static final String VECTOR_OFFSET = "vector.offset"; + + static final String VECTOR_POS_OFFSET = "vector.pos_offset"; + + static final String VECTOR_YES = "vector.yes"; + +} Index: src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java =================================================================== --- src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java (revision 0) +++ src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java (revision 0) @@ -0,0 +1,292 @@ +package org.apache.nutch.indexer.lucene; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexWriter; +import org.apache.nutch.analysis.AnalyzerFactory; +import org.apache.nutch.analysis.NutchAnalyzer; +import org.apache.nutch.analysis.NutchDocumentAnalyzer; +import org.apache.nutch.indexer.Indexer; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.NutchIndexWriter; +import org.apache.nutch.indexer.NutchSimilarity; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.util.LogUtil; + +public class LuceneWriter implements NutchIndexWriter { + + public static enum STORE { YES, NO, COMPRESS } + + public static enum INDEX { NO, NO_NORMS, TOKENIZED, UNTOKENIZED } + + public static enum VECTOR { NO, OFFSET, POS, POS_OFFSET, YES } + + private IndexWriter writer; + + private AnalyzerFactory analyzerFactory; + + private Path perm; + + private Path temp; + + private FileSystem fs; + + private Map fieldStore; + + private Map fieldIndex; + + private Map fieldVector; + + public LuceneWriter() { + fieldStore = new HashMap(); + fieldIndex = new HashMap(); + fieldVector = new HashMap(); + } + + private Document createLuceneDoc(NutchDocument doc) { + Document out = new Document(); + + out.setBoost(doc.getScore()); + + Iterator>> iterator = doc.fieldIterator(); + Metadata documentMeta = doc.getDocumentMeta(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + String fieldName = entry.getKey(); + + Field.Store store = fieldStore.get(fieldName); + Field.Index index = fieldIndex.get(fieldName); + Field.TermVector vector = fieldVector.get(fieldName); + + // default values + if (store == null) { + store = Field.Store.NO; + } + + if (index == null) { + index = Field.Index.NO; + } + + if (vector == null) { + vector = Field.TermVector.NO; + } + + // read document-level field information + String[] fieldMetas = + documentMeta.getValues(LuceneConstants.FIELD_PREFIX + fieldName); + if (fieldMetas.length != 0) { + for (String val : fieldMetas) { + if (LuceneConstants.STORE_YES.equals(val)) { + store = Field.Store.YES; + } else if (LuceneConstants.STORE_NO.equals(val)) { + store = Field.Store.NO; + } else if (LuceneConstants.INDEX_TOKENIZED.equals(val)) { + index = Field.Index.TOKENIZED; + } else if (LuceneConstants.INDEX_NO.equals(val)) { + index = Field.Index.NO; + } else if (LuceneConstants.INDEX_UNTOKENIZED.equals(val)) { + index = Field.Index.UN_TOKENIZED; + } else if (LuceneConstants.INDEX_NO_NORMS.equals(val)) { + index = Field.Index.NO_NORMS; + } else if (LuceneConstants.VECTOR_NO.equals(val)) { + vector = Field.TermVector.NO; + } else if (LuceneConstants.VECTOR_YES.equals(val)) { + vector = Field.TermVector.YES; + } else if (LuceneConstants.VECTOR_POS.equals(val)) { + vector = Field.TermVector.WITH_POSITIONS; + } else if (LuceneConstants.VECTOR_POS_OFFSET.equals(val)) { + vector = Field.TermVector.WITH_POSITIONS_OFFSETS; + } else if (LuceneConstants.VECTOR_OFFSET.equals(val)) { + vector = Field.TermVector.WITH_OFFSETS; + } + } + } + + for (String fieldValue : entry.getValue()) { + out.add(new Field(fieldName, fieldValue, store, index, vector)); + } + } + + return out; + } + + @SuppressWarnings("unchecked") + private void processOptions(Configuration conf) { + Iterator iterator = conf.iterator(); + while (iterator.hasNext()) { + String key = (String) ((Map.Entry)iterator.next()).getKey(); + if (!key.startsWith(LuceneConstants.LUCENE_PREFIX)) { + continue; + } + if (key.startsWith(LuceneConstants.FIELD_STORE_PREFIX)) { + String field = + key.substring(LuceneConstants.FIELD_STORE_PREFIX.length()); + LuceneWriter.STORE store = LuceneWriter.STORE.valueOf(conf.get(key)); + switch (store) { + case YES: + fieldStore.put(field, Field.Store.YES); + break; + case NO: + fieldStore.put(field, Field.Store.NO); + break; + case COMPRESS: + fieldStore.put(field, Field.Store.COMPRESS); + break; + } + } else if (key.startsWith(LuceneConstants.FIELD_INDEX_PREFIX)) { + String field = + key.substring(LuceneConstants.FIELD_INDEX_PREFIX.length()); + LuceneWriter.INDEX index = LuceneWriter.INDEX.valueOf(conf.get(key)); + switch (index) { + case NO: + fieldIndex.put(field, Field.Index.NO); + break; + case NO_NORMS: + fieldIndex.put(field, Field.Index.NO_NORMS); + break; + case TOKENIZED: + fieldIndex.put(field, Field.Index.TOKENIZED); + break; + case UNTOKENIZED: + fieldIndex.put(field, Field.Index.UN_TOKENIZED); + break; + } + } else if (key.startsWith(LuceneConstants.FIELD_VECTOR_PREFIX)) { + String field = + key.substring(LuceneConstants.FIELD_VECTOR_PREFIX.length()); + LuceneWriter.VECTOR vector = LuceneWriter.VECTOR.valueOf(conf.get(key)); + switch (vector) { + case NO: + fieldVector.put(field, Field.TermVector.NO); + break; + case OFFSET: + fieldVector.put(field, Field.TermVector.WITH_OFFSETS); + break; + case POS: + fieldVector.put(field, Field.TermVector.WITH_POSITIONS); + break; + case POS_OFFSET: + fieldVector.put(field, Field.TermVector.WITH_POSITIONS_OFFSETS); + break; + case YES: + fieldVector.put(field, Field.TermVector.YES); + break; + } + } + } + } + + public void close() throws IOException { + writer.optimize(); + writer.close(); + fs.completeLocalOutput(perm, temp); // copy to dfs + fs.createNewFile(new Path(perm, Indexer.DONE_NAME)); + } + + public void open(Configuration conf) + throws IOException { + this.fs = FileSystem.get(conf); + perm = new Path(conf.get(LuceneConstants.OUTPUT_DIR)); + temp = new Path(conf.get(LuceneConstants.TEMP_OUTPUT_DIR)); + + fs.delete(perm); // delete old, if any + analyzerFactory = new AnalyzerFactory(conf); + writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(), + new NutchDocumentAnalyzer(conf), true); + + writer.setMergeFactor(conf.getInt("indexer.mergeFactor", 10)); + writer.setMaxBufferedDocs(conf.getInt("indexer.minMergeDocs", 100)); + writer.setMaxMergeDocs(conf + .getInt("indexer.maxMergeDocs", Integer.MAX_VALUE)); + writer.setTermIndexInterval(conf.getInt("indexer.termIndexInterval", 128)); + writer.setMaxFieldLength(conf.getInt("indexer.max.tokens", 10000)); + writer.setInfoStream(LogUtil.getDebugStream(Indexer.LOG)); + writer.setUseCompoundFile(false); + writer.setSimilarity(new NutchSimilarity()); + + processOptions(conf); + } + + public void write(NutchDocument doc) throws IOException { + Document luceneDoc = createLuceneDoc(doc); + NutchAnalyzer analyzer = analyzerFactory.get(luceneDoc.get("lang")); + if (Indexer.LOG.isDebugEnabled()) { + Indexer.LOG.debug("Indexing [" + luceneDoc.get("url") + + "] with analyzer " + analyzer + " (" + luceneDoc.get("lang") + + ")"); + } + writer.addDocument(luceneDoc, analyzer); + + } + + /** Adds a lucene field. + *

+ * This method is provided for backward-compatibility with + * older indexing filters. This should not be used by newer + * implementations since this is slower than + * {@link NutchDocument#add(String, String)} and will be removed + * in a future release. + *

+ * @param f Lucene field to be added. + * @deprecated Use {@link NutchDocument#add(String, String)} instead and + * set index-level metadata for field information. + * */ + public static void add(NutchDocument doc, Field f) { + String fieldName = f.name(); + String key = LuceneConstants.FIELD_PREFIX + fieldName; + Metadata documentMeta = doc.getDocumentMeta(); + if (f.isStored()) { + documentMeta.add(key, LuceneConstants.STORE_YES); + } else if (f.isCompressed()) { + documentMeta.add(key, LuceneConstants.STORE_COMPRESS); + } else { + documentMeta.add(key, LuceneConstants.STORE_NO); + } + + if (f.isIndexed()) { + if (f.isTokenized()) { + documentMeta.add(key, LuceneConstants.INDEX_TOKENIZED); + } else if (f.getOmitNorms()) { + documentMeta.add(key, LuceneConstants.INDEX_NO_NORMS); + } else { + documentMeta.add(key, LuceneConstants.INDEX_UNTOKENIZED); + } + } else { + documentMeta.add(key, LuceneConstants.INDEX_NO); + } + + if (f.isStoreOffsetWithTermVector() && f.isStorePositionWithTermVector()) { + documentMeta.add(key, LuceneConstants.VECTOR_POS_OFFSET); + } else if (f.isStoreOffsetWithTermVector()) { + documentMeta.add(key, LuceneConstants.VECTOR_OFFSET); + } else if (f.isStorePositionWithTermVector()) { + documentMeta.add(key, LuceneConstants.VECTOR_POS); + } else if (f.isTermVectorStored()) { + documentMeta.add(key, LuceneConstants.VECTOR_YES); + } else { + documentMeta.add(key, LuceneConstants.VECTOR_NO); + } + } + + public static void addFieldOptions(String field, LuceneWriter.STORE store, + LuceneWriter.INDEX index, LuceneWriter.VECTOR vector, Configuration conf) { + + conf.set(LuceneConstants.FIELD_STORE_PREFIX + field, store.toString()); + conf.set(LuceneConstants.FIELD_INDEX_PREFIX + field, index.toString()); + conf.set(LuceneConstants.FIELD_VECTOR_PREFIX + field, vector.toString()); + } + + public static void addFieldOptions(String field, LuceneWriter.STORE store, + LuceneWriter.INDEX index, Configuration conf) { + LuceneWriter.addFieldOptions(field, store, index, LuceneWriter.VECTOR.NO, conf); + } +} Index: src/java/org/apache/nutch/indexer/NutchDocument.java =================================================================== --- src/java/org/apache/nutch/indexer/NutchDocument.java (revision 0) +++ src/java/org/apache/nutch/indexer/NutchDocument.java (revision 0) @@ -0,0 +1,123 @@ +package org.apache.nutch.indexer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.nutch.metadata.Metadata; + +/** A {@link NutchDocument} is the unit of indexing.*/ +public class NutchDocument implements Writable { + + public static final byte VERSION = 1; + + private Map> fields; + + private Metadata documentMeta; + + private float score; + + public NutchDocument() { + fields = new HashMap>(); + documentMeta = new Metadata(); + score = 0.0f; + } + + public void add(String name, String value) { + List fieldValues = fields.get(name); + if (fieldValues == null) { + fieldValues = new ArrayList(); + } + fieldValues.add(value); + fields.put(name, fieldValues); + } + + private void addFieldUnprotected(String name, String value) { + fields.get(name).add(value); + } + + public String getFieldValue(String name) { + List fieldValues = fields.get(name); + if (fieldValues == null) { + return null; + } + if (fieldValues.size() == 0) { + return null; + } + return fieldValues.get(0); + } + + public List getFieldValues(String name) { + return fields.get(name); + } + + public List removeField(String name) { + return fields.remove(name); + } + + public Collection getFieldNames() { + return fields.keySet(); + } + + /** Iterate over all fields. */ + public Iterator>> fieldIterator() { + return fields.entrySet().iterator(); + } + + public float getScore() { + return score; + } + + public void setScore(float score) { + this.score = score; + } + + public Metadata getDocumentMeta() { + return documentMeta; + } + + public void readFields(DataInput in) throws IOException { + byte version = in.readByte(); + if (version != VERSION) { + throw new VersionMismatchException(VERSION, version); + } + int size = WritableUtils.readVInt(in); + for (int i = 0; i < size; i++) { + String name = Text.readString(in); + int numValues = WritableUtils.readVInt(in); + fields.put(name, new ArrayList()); + for (int j = 0; j < numValues; j++) { + String value = Text.readString(in); + addFieldUnprotected(name, value); + } + } + score = in.readFloat(); + documentMeta.readFields(in); + } + + public void write(DataOutput out) throws IOException { + out.writeByte(VERSION); + WritableUtils.writeVInt(out, fields.size()); + for (Map.Entry> entry : fields.entrySet()) { + Text.writeString(out, entry.getKey()); + List values = entry.getValue(); + WritableUtils.writeVInt(out, values.size()); + for (String value : values) { + Text.writeString(out, value); + } + } + out.writeFloat(score); + documentMeta.write(out); + } + +} Index: src/java/org/apache/nutch/indexer/NutchIndexWriter.java =================================================================== --- src/java/org/apache/nutch/indexer/NutchIndexWriter.java (revision 0) +++ src/java/org/apache/nutch/indexer/NutchIndexWriter.java (revision 0) @@ -0,0 +1,14 @@ +package org.apache.nutch.indexer; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +public interface NutchIndexWriter { + public void open(Configuration conf) throws IOException; + + public void write(NutchDocument doc) throws IOException; + + public void close() throws IOException; + +} Index: src/java/org/apache/nutch/indexer/solr/SolrConstants.java =================================================================== --- src/java/org/apache/nutch/indexer/solr/SolrConstants.java (revision 0) +++ src/java/org/apache/nutch/indexer/solr/SolrConstants.java (revision 0) @@ -0,0 +1,8 @@ +package org.apache.nutch.indexer.solr; + +public interface SolrConstants { + public static final String SOLR_PREFIX = "solr."; + + public static final String SERVER_URL = SOLR_PREFIX + "server.url"; + +} Index: src/java/org/apache/nutch/indexer/solr/SolrWriter.java =================================================================== --- src/java/org/apache/nutch/indexer/solr/SolrWriter.java (revision 0) +++ src/java/org/apache/nutch/indexer/solr/SolrWriter.java (revision 0) @@ -0,0 +1,27 @@ +package org.apache.nutch.indexer.solr; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.NutchIndexWriter; +import org.apache.nutch.util.solr.SolrClient; + +public class SolrWriter implements NutchIndexWriter { + + private SolrClient client; + + public void close() throws IOException { + client.commitAndOptimize(true); + } + + public void open(Configuration conf) + throws IOException { + client = new SolrClient(conf); + } + + public void write(NutchDocument doc) throws IOException { + client.addDocument(doc); + } + +} Index: src/java/org/apache/nutch/scoring/ScoringFilter.java =================================================================== --- src/java/org/apache/nutch/scoring/ScoringFilter.java (revision 681741) +++ src/java/org/apache/nutch/scoring/ScoringFilter.java (working copy) @@ -22,9 +22,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseData; import org.apache.nutch.plugin.Pluggable; @@ -156,6 +156,6 @@ * other scoring strategies by modifying Lucene document directly. * @throws ScoringFilterException */ - public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException; } Index: src/java/org/apache/nutch/scoring/ScoringFilters.java =================================================================== --- src/java/org/apache/nutch/scoring/ScoringFilters.java (revision 681741) +++ src/java/org/apache/nutch/scoring/ScoringFilters.java (working copy) @@ -22,9 +22,9 @@ import java.util.List; import java.util.Map.Entry; -import org.apache.lucene.document.Document; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseData; import org.apache.nutch.plugin.Extension; @@ -138,7 +138,7 @@ return adjust; } - public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException { + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException { for (int i = 0; i < this.filters.length; i++) { initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum, parse, inlinks, initScore); } Index: src/java/org/apache/nutch/searcher/DistributedSearch.java =================================================================== --- src/java/org/apache/nutch/searcher/DistributedSearch.java (revision 681741) +++ src/java/org/apache/nutch/searcher/DistributedSearch.java (working copy) @@ -17,51 +17,22 @@ package org.apache.nutch.searcher; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.StringTokenizer; -import java.util.TreeSet; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.nutch.crawl.Inlinks; -import org.apache.nutch.parse.ParseData; -import org.apache.nutch.parse.ParseText; import org.apache.nutch.util.NutchConfiguration; -/** Implements the search API over IPC connnections. */ +/** Search/summary servers. */ public class DistributedSearch { - public static final Log LOG = LogFactory.getLog(DistributedSearch.class); private DistributedSearch() {} // no public ctor - /** The distributed search protocol. */ - public static interface Protocol - extends Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, VersionedProtocol { - - /** The name of the segments searched by this node. */ - String[] getSegmentNames(); - } - - /** The search server. */ - public static class Server { - - private Server() {} - - /** Runs a search server. */ + /** Runs a search/summary server. */ + public static class Server { public static void main(String[] args) throws Exception { - String usage = "DistributedSearch$Server "; + String usage = "DistributedSearch$Server "; if (args.length == 0 || args.length > 2) { System.err.println(usage); @@ -73,7 +44,8 @@ Configuration conf = NutchConfiguration.create(); - org.apache.hadoop.ipc.Server server = getServer(conf, directory, port); + org.apache.hadoop.ipc.Server server = + getServer(conf, directory, port); server.start(); server.join(); } @@ -78,7 +50,8 @@ server.join(); } - static org.apache.hadoop.ipc.Server getServer(Configuration conf, Path directory, int port) throws IOException{ + static org.apache.hadoop.ipc.Server getServer(Configuration conf, + Path directory, int port) throws IOException{ NutchBean bean = new NutchBean(conf, directory); int numHandlers = conf.getInt("searcher.num.handlers", 10); return RPC.getServer(bean, "0.0.0.0", port, numHandlers, true, conf); @@ -86,372 +59,50 @@ } - /** The search client. */ - public static class Client extends Thread - implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, - Runnable { - - private InetSocketAddress[] defaultAddresses; - private boolean[] liveServer; - private HashMap segmentToAddress = new HashMap(); - - private boolean running = true; - private Configuration conf; - - private Path file; - private long timestamp; - private FileSystem fs; - - /** Construct a client talking to servers listed in the named file. - * Each line in the file lists a server hostname and port, separated by - * whitespace. - */ - public Client(Path file, Configuration conf) - throws IOException { - this(readConfig(file, conf), conf); - this.file = file; - this.timestamp = fs.getFileStatus(file).getModificationTime(); - } - - private static InetSocketAddress[] readConfig(Path path, Configuration conf) - throws IOException { - FileSystem fs = FileSystem.get(conf); - BufferedReader reader = - new BufferedReader(new InputStreamReader(fs.open(path))); - try { - ArrayList addrs = new ArrayList(); - String line; - while ((line = reader.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line); - if (tokens.hasMoreTokens()) { - String host = tokens.nextToken(); - if (tokens.hasMoreTokens()) { - String port = tokens.nextToken(); - addrs.add(new InetSocketAddress(host, Integer.parseInt(port))); - if (LOG.isInfoEnabled()) { - LOG.info("Client adding server " + host + ":" + port); - } - } - } - } - return (InetSocketAddress[]) - addrs.toArray(new InetSocketAddress[addrs.size()]); - } finally { - reader.close(); - } - } - - /** Construct a client talking to the named servers. */ - public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException { - this.conf = conf; - this.defaultAddresses = addresses; - this.liveServer = new boolean[addresses.length]; - this.fs = FileSystem.get(conf); - updateSegments(); - setDaemon(true); - start(); - } - - private static final Method GET_SEGMENTS; - private static final Method SEARCH; - private static final Method DETAILS; - private static final Method SUMMARY; - static { - try { - GET_SEGMENTS = Protocol.class.getMethod - ("getSegmentNames", new Class[] {}); - SEARCH = Protocol.class.getMethod - ("search", new Class[] { Query.class, Integer.TYPE, String.class, - String.class, Boolean.TYPE}); - DETAILS = Protocol.class.getMethod - ("getDetails", new Class[] { Hit.class}); - SUMMARY = Protocol.class.getMethod - ("getSummary", new Class[] { HitDetails.class, Query.class}); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - } - - /** - * Check to see if search-servers file has been modified - * - * @throws IOException - */ - public boolean isFileModified() - throws IOException { - - if (file != null) { - long modTime = fs.getFileStatus(file).getModificationTime(); - if (timestamp < modTime) { - this.timestamp = fs.getFileStatus(file).getModificationTime(); - return true; - } - } - - return false; - } - - /** Updates segment names. - * - * @throws IOException - */ - public void updateSegments() throws IOException { - - int liveServers = 0; - int liveSegments = 0; - - if (isFileModified()) { - defaultAddresses = readConfig(file, conf); - } - - // Create new array of flags so they can all be updated at once. - boolean[] updatedLiveServer = new boolean[defaultAddresses.length]; - - // build segmentToAddress map - Object[][] params = new Object[defaultAddresses.length][0]; - String[][] results = - (String[][])RPC.call(GET_SEGMENTS, params, defaultAddresses, this.conf); - - for (int i = 0; i < results.length; i++) { // process results of call - InetSocketAddress addr = defaultAddresses[i]; - String[] segments = results[i]; - if (segments == null) { - updatedLiveServer[i] = false; - if (LOG.isWarnEnabled()) { - LOG.warn("Client: no segments from: " + addr); - } - continue; - } - - for (int j = 0; j < segments.length; j++) { - if (LOG.isTraceEnabled()) { - LOG.trace("Client: segment "+segments[j]+" at "+addr); - } - segmentToAddress.put(segments[j], addr); - } - - updatedLiveServer[i] = true; - liveServers++; - liveSegments += segments.length; - } - - // Now update live server flags. - this.liveServer = updatedLiveServer; - - if (LOG.isInfoEnabled()) { - LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments."); - } - } - - /** Return the names of segments searched. */ - public String[] getSegmentNames() { - return (String[]) - segmentToAddress.keySet().toArray(new String[segmentToAddress.size()]); - } - - public Hits search(final Query query, final int numHits, - final String dedupField, final String sortField, - final boolean reverse) throws IOException { - // Get the list of live servers. It would be nice to build this - // list in updateSegments(), but that would create concurrency issues. - // We grab a local reference to the live server flags in case it - // is updated while we are building our list of liveAddresses. - boolean[] savedLiveServer = this.liveServer; - int numLive = 0; - for (int i = 0; i < savedLiveServer.length; i++) { - if (savedLiveServer[i]) - numLive++; - } - InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive]; - int[] liveIndexNos = new int[numLive]; - int k = 0; - for (int i = 0; i < savedLiveServer.length; i++) { - if (savedLiveServer[i]) { - liveAddresses[k] = defaultAddresses[i]; - liveIndexNos[k] = i; - k++; - } + public static class IndexServer { + /** Runs a lucene search server. */ + public static void main(String[] args) throws Exception { + final String usage = "DistributedSearch$IndexServer "; + if (args.length == 0 || args.length > 2) { + System.err.println(usage); + System.exit(-1); } - Object[][] params = new Object[liveAddresses.length][5]; - for (int i = 0; i < params.length; i++) { - params[i][0] = query; - params[i][1] = new Integer(numHits); - params[i][2] = dedupField; - params[i][3] = sortField; - params[i][4] = Boolean.valueOf(reverse); - } - Hits[] results = (Hits[])RPC.call(SEARCH, params, liveAddresses, this.conf); - - TreeSet queue; // cull top hits from results - - if (sortField == null || reverse) { - queue = new TreeSet(new Comparator() { - public int compare(Object o1, Object o2) { - return ((Comparable)o2).compareTo(o1); // reverse natural order - } - }); - } else { - queue = new TreeSet(); - } - - long totalHits = 0; - Comparable maxValue = null; - for (int i = 0; i < results.length; i++) { - Hits hits = results[i]; - if (hits == null) continue; - totalHits += hits.getTotal(); - for (int j = 0; j < hits.getLength(); j++) { - Hit h = hits.getHit(j); - if (maxValue == null || - ((reverse || sortField == null) - ? h.getSortValue().compareTo(maxValue) >= 0 - : h.getSortValue().compareTo(maxValue) <= 0)) { - queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(), - h.getSortValue(), h.getDedupValue())); - if (queue.size() > numHits) { // if hit queue overfull - queue.remove(queue.last()); // remove lowest in hit queue - maxValue = ((Hit)queue.last()).getSortValue(); // reset maxValue - } - } - } - } - return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()])); - } - - // version for hadoop-0.5.0.jar - public static final long versionID = 1L; - - private Protocol getRemote(Hit hit) throws IOException { - return (Protocol) - RPC.getProxy(Protocol.class, versionID, defaultAddresses[hit.getIndexNo()], conf); - } + int port = Integer.parseInt(args[0]); + Path dir = new Path(args[1]); - private Protocol getRemote(HitDetails hit) throws IOException { - InetSocketAddress address = - (InetSocketAddress)segmentToAddress.get(hit.getValue("segment")); - return (Protocol)RPC.getProxy(Protocol.class, versionID, address, conf); - } + Configuration conf = NutchConfiguration.create(); - public String getExplanation(Query query, Hit hit) throws IOException { - return getRemote(hit).getExplanation(query, hit); + LuceneSearchBean bean = new LuceneSearchBean(conf, + new Path(dir, "index"), new Path(dir, "indexes")); + org.apache.hadoop.ipc.RPC.Server server = + RPC.getServer(bean, "0.0.0.0", port, 10, false, conf); + server.start(); + server.join(); } - - public HitDetails getDetails(Hit hit) throws IOException { - return getRemote(hit).getDetails(hit); - } - - public HitDetails[] getDetails(Hit[] hits) throws IOException { - InetSocketAddress[] addrs = new InetSocketAddress[hits.length]; - Object[][] params = new Object[hits.length][1]; - for (int i = 0; i < hits.length; i++) { - addrs[i] = defaultAddresses[hits[i].getIndexNo()]; - params[i][0] = hits[i]; - } - return (HitDetails[])RPC.call(DETAILS, params, addrs, conf); - } - - - public Summary getSummary(HitDetails hit, Query query) throws IOException { - return getRemote(hit).getSummary(hit, query); - } - - public Summary[] getSummary(HitDetails[] hits, Query query) - throws IOException { - InetSocketAddress[] addrs = new InetSocketAddress[hits.length]; - Object[][] params = new Object[hits.length][2]; - for (int i = 0; i < hits.length; i++) { - HitDetails hit = hits[i]; - addrs[i] = - (InetSocketAddress)segmentToAddress.get(hit.getValue("segment")); - params[i][0] = hit; - params[i][1] = query; + } + + public static class SegmentServer { + /** Runs a summary server. */ + public static void main(String[] args) throws Exception { + final String usage = + "DistributedSearch$SegmentServer "; + if (args.length < 2) { + System.err.println(usage); + System.exit(1); } - return (Summary[])RPC.call(SUMMARY, params, addrs, conf); - } - - public byte[] getContent(HitDetails hit) throws IOException { - return getRemote(hit).getContent(hit); - } - - public ParseData getParseData(HitDetails hit) throws IOException { - return getRemote(hit).getParseData(hit); - } - public ParseText getParseText(HitDetails hit) throws IOException { - return getRemote(hit).getParseText(hit); - } + Configuration conf = NutchConfiguration.create(); + int port = Integer.parseInt(args[0]); + Path segmentsDir = new Path(args[1], "segments"); - public String[] getAnchors(HitDetails hit) throws IOException { - return getRemote(hit).getAnchors(hit); - } - - public Inlinks getInlinks(HitDetails hit) throws IOException { - return getRemote(hit).getInlinks(hit); - } - - public long getFetchDate(HitDetails hit) throws IOException { - return getRemote(hit).getFetchDate(hit); - } + FetchedSegments segments = new FetchedSegments(conf, segmentsDir); - public static void main(String[] args) throws Exception { - String usage = "DistributedSearch$Client query ..."; - - if (args.length == 0) { - System.err.println(usage); - System.exit(-1); - } - - Query query = Query.parse(args[0], NutchConfiguration.create()); + org.apache.hadoop.ipc.RPC.Server server = + RPC.getServer(segments, "0.0.0.0", port, conf); - InetSocketAddress[] addresses = new InetSocketAddress[(args.length-1)/2]; - for (int i = 0; i < (args.length-1)/2; i++) { - addresses[i] = - new InetSocketAddress(args[i*2+1], Integer.parseInt(args[i*2+2])); - } - - Client client = new Client(addresses, NutchConfiguration.create()); - //client.setTimeout(Integer.MAX_VALUE); - - Hits hits = client.search(query, 10, null, null, false); - System.out.println("Total hits: " + hits.getTotal()); - for (int i = 0; i < hits.getLength(); i++) { - System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i))); - } - - } - - public void run() { - while (running){ - try{ - Thread.sleep(10000); - } catch (InterruptedException ie){ - if (LOG.isInfoEnabled()) { - LOG.info("Thread sleep interrupted."); - } - } - try{ - if (LOG.isInfoEnabled()) { - LOG.info("Querying segments from search servers..."); - } - updateSegments(); - } catch (IOException ioe) { - if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); } - liveServer = new boolean[defaultAddresses.length]; - } - } - } - - /** - * Stops the watchdog thread. - */ - public void close() { - running = false; - interrupt(); - } - - public boolean[] getLiveServer() { - return liveServer; + server.start(); + server.join(); } } } \ No newline at end of file Index: src/java/org/apache/nutch/searcher/DistributedSearchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/DistributedSearchBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/DistributedSearchBean.java (revision 0) @@ -0,0 +1,326 @@ +package org.apache.nutch.searcher; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.util.StringUtils; + +public class DistributedSearchBean implements SearchBean { + + private static final ExecutorService executor = + Executors.newCachedThreadPool(); + + private final ScheduledExecutorService pingService; + + private class SearchTask implements Callable { + private int id; + + private Query query; + private int numHits; + private String dedupField; + private String sortField; + private boolean reverse; + + public SearchTask(int id) { + this.id = id; + } + + public Hits call() throws Exception { + if (!liveServers[id]) { + return null; + } + return beans[id].search(query, numHits, dedupField, sortField, reverse); + } + + public void setSearchArgs(Query query, int numHits, String dedupField, + String sortField, boolean reverse) { + this.query = query; + this.numHits = numHits; + this.dedupField = dedupField; + this.sortField = sortField; + this.reverse = reverse; + } + + } + + private class DetailTask implements Callable { + private int id; + + private Hit[] hits; + + public DetailTask(int id) { + this.id = id; + } + + public HitDetails[] call() throws Exception { + if (hits == null) { + return null; + } + return beans[id].getDetails(hits); + } + + public void setHits(Hit[] hits) { + this.hits = hits; + } + + } + + private class PingWorker implements Runnable { + private int id; + + public PingWorker(int id) { + this.id = id; + } + + public void run() { + try { + if (beans[id].ping()) { + liveServers[id] = true; + } else { + liveServers[id] = false; + } + } catch (IOException e) { + liveServers[id] = false; + } + } + } + + private volatile boolean liveServers[]; + + private SearchBean[] beans; + + private List> searchTasks; + + private List> detailTasks; + + private List pingWorkers; + + private long timeout; + + public DistributedSearchBean(Configuration conf, + Path luceneConfig, Path solrConfig) + throws IOException { + FileSystem fs = FileSystem.get(conf); + + this.timeout = conf.getLong("ipc.client.timeout", 60000); + + List beanList = new ArrayList(); + + if (fs.exists(luceneConfig)) { + addLuceneBeans(beanList, luceneConfig, conf); + } + + if (fs.exists(solrConfig)) { + addSolrBeans(beanList, solrConfig, conf); + } + + beans = beanList.toArray(new SearchBean[beanList.size()]); + + liveServers = new boolean[beans.length]; + for (int i = 0; i < liveServers.length; i++) { + liveServers[i] = true; + } + + searchTasks = new ArrayList>(); + detailTasks = new ArrayList>(); + pingWorkers = new ArrayList(); + + for (int i = 0; i < beans.length; i++) { + searchTasks.add(new SearchTask(i)); + detailTasks.add(new DetailTask(i)); + pingWorkers.add(new PingWorker(i)); + } + + pingService = Executors.newScheduledThreadPool(beans.length); + for (PingWorker worker : pingWorkers) { + pingService.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS); + } + + } + + private static void addLuceneBeans(List beanList, + Path luceneConfig, Configuration conf) + throws IOException { + Configuration newConf = new Configuration(conf); + + // do not retry connections + newConf.setInt("ipc.client.connect.max.retries", 0); + + List luceneServers = + NutchBean.readAddresses(luceneConfig, conf); + for (InetSocketAddress addr : luceneServers) { + beanList.add((RPCSearchBean) RPC.getProxy(RPCSearchBean.class, + LuceneSearchBean.VERSION, addr, newConf)); + } + } + + private static void addSolrBeans(List beanList, + Path solrConfig, Configuration conf) + throws IOException { + for (String solrServer : NutchBean.readConfig(solrConfig, conf)) { + beanList.add(new SolrSearchBean(conf, solrServer)); + } + } + + public String getExplanation(Query query, Hit hit) throws IOException { + return beans[hit.getIndexNo()].getExplanation(query, hit); + } + + public Hits search(Query query, int numHits, String dedupField, + String sortField, boolean reverse) throws IOException { + + for (Callable task : searchTasks) { + ((SearchTask)task).setSearchArgs(query, numHits, dedupField, sortField, + reverse); + } + + List> allHits; + try { + allHits = + executor.invokeAll(searchTasks, timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + PriorityQueue queue; // cull top hits from results + if (sortField == null || reverse) { + queue = new PriorityQueue(numHits); + } else { + queue = new PriorityQueue(numHits, new Comparator() { + public int compare(Hit h1, Hit h2) { + return h2.compareTo(h1); // reverse natural order + } + }); + } + + long totalHits = 0; + int allHitsSize = allHits.size(); + for (int i = 0; i < allHitsSize; i++) { + Hits hits = null; + try { + hits = allHits.get(i).get(); + } catch (InterruptedException e) { + // ignore + } catch (ExecutionException e) { + LOG.warn("Retrieving hits failed with exception: " + + StringUtils.stringifyException(e.getCause())); + } + + if (hits == null) { + continue; + } + + totalHits += hits.getTotal(); + + int hitsLength = hits.getLength(); + for (int j = 0; j < hitsLength; j++) { + Hit hit = hits.getHit(j); + Hit newHit = new Hit(i, hit.getUniqueKey(), + hit.getSortValue(), hit.getDedupValue()); + queue.add(newHit); + if (queue.size() > numHits) { // if hit queue overfull + queue.remove(); + } + } + } + + // we have to sort results since PriorityQueue.toArray + // may not return results in sorted order + Hit[] culledResults = queue.toArray(new Hit[queue.size()]); + Arrays.sort(culledResults, Collections.reverseOrder(queue.comparator())); + + return new Hits(totalHits, culledResults); + } + + public void close() throws IOException { + executor.shutdown(); + pingService.shutdown(); + for (SearchBean bean : beans) { + bean.close(); + } + } + + public HitDetails getDetails(Hit hit) throws IOException { + return beans[hit.getIndexNo()].getDetails(hit); + } + + @SuppressWarnings("unchecked") + public HitDetails[] getDetails(Hit[] hits) throws IOException { + List[] hitList = new ArrayList[detailTasks.size()]; + + for (int i = 0; i < hitList.length; i++) { + hitList[i] = new ArrayList(); + } + + for (int i = 0; i < hits.length; i++) { + Hit hit = hits[i]; + hitList[hit.getIndexNo()].add(hit); + } + + for (int i = 0; i < detailTasks.size(); i++) { + DetailTask task = (DetailTask)detailTasks.get(i); + if (hitList[i].size() > 0) { + task.setHits(hitList[i].toArray(new Hit[hitList[i].size()])); + } else { + task.setHits(null); + } + } + + List> allDetails; + try { + allDetails = + executor.invokeAll(detailTasks, timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + /* getDetails(Hit[]) method assumes that HitDetails[i] returned corresponds + * to Hit[i] given as parameter. To keep this order, we have to 'merge' + * HitDetails[] returned from individual detailTasks. + */ + HitDetails[][] detailsMatrix = new HitDetails[detailTasks.size()][]; + for (int i = 0; i < detailsMatrix.length; i++) { + try { + detailsMatrix[i] = allDetails.get(i).get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new RuntimeException(e); + } + } + + int[] hitPos = new int[detailTasks.size()]; // keep track of where we are + HitDetails[] detailsArr = new HitDetails[hits.length]; + for (int i = 0; i < detailsArr.length; i++) { + int indexNo = hits[i].getIndexNo(); + detailsArr[i] = detailsMatrix[indexNo][(hitPos[indexNo]++)]; + } + + return detailsArr; + } + + public boolean ping() { + return true; // not used + } + +} Index: src/java/org/apache/nutch/searcher/DistributedSegmentBean.java =================================================================== --- src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (revision 0) @@ -0,0 +1,214 @@ +package org.apache.nutch.searcher; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; + +public class DistributedSegmentBean implements SegmentBean { + + private static final ExecutorService executor = + Executors.newCachedThreadPool(); + + private final ScheduledExecutorService pingService; + + private class DistSummmaryTask implements Callable { + private int id; + + private HitDetails[] details; + private Query query; + + public DistSummmaryTask(int id) { + this.id = id; + } + + public Summary[] call() throws Exception { + if (details == null) { + return null; + } + return beans[id].getSummary(details, query); + } + + public void setSummaryArgs(HitDetails[] details, Query query) { + this.details = details; + this.query = query; + } + + } + + private class SegmentWorker implements Runnable { + private int id; + + public SegmentWorker(int id) { + this.id = id; + } + + public void run() { + try { + String[] segments = beans[id].getSegmentNames(); + for (String segment : segments) { + segmentMap.put(segment, id); + } + } catch (IOException e) { + // remove all segments this bean was serving + Iterator> i = + segmentMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + int curId = entry.getValue(); + if (curId == this.id) { + i.remove(); + } + } + } + } + } + + private long timeout; + + private SegmentBean[] beans; + + private ConcurrentMap segmentMap; + + private List> summaryTasks; + + private List segmentWorkers; + + public DistributedSegmentBean(Configuration conf, Path serversConfig) + throws IOException { + this.timeout = conf.getLong("ipc.client.timeout", 60000); + + List beanList = new ArrayList(); + + List segmentServers = + NutchBean.readAddresses(serversConfig, conf); + + for (InetSocketAddress addr : segmentServers) { + SegmentBean bean = (RPCSegmentBean) RPC.getProxy(RPCSegmentBean.class, + FetchedSegments.VERSION, addr, conf); + beanList.add(bean); + } + + beans = beanList.toArray(new SegmentBean[beanList.size()]); + + summaryTasks = new ArrayList>(beans.length); + segmentWorkers = new ArrayList(beans.length); + + for (int i = 0; i < beans.length; i++) { + summaryTasks.add(new DistSummmaryTask(i)); + segmentWorkers.add(new SegmentWorker(i)); + } + + segmentMap = new ConcurrentHashMap(); + + pingService = Executors.newScheduledThreadPool(beans.length); + for (SegmentWorker worker : segmentWorkers) { + pingService.scheduleAtFixedRate(worker, 0, 30, TimeUnit.SECONDS); + } + } + + private SegmentBean getBean(HitDetails details) { + return beans[segmentMap.get(details.getValue("segment"))]; + } + + public String[] getSegmentNames() { + return segmentMap.keySet().toArray(new String[segmentMap.size()]); + } + + public byte[] getContent(HitDetails details) throws IOException { + return getBean(details).getContent(details); + } + + public long getFetchDate(HitDetails details) throws IOException { + return getBean(details).getFetchDate(details); + } + + public ParseData getParseData(HitDetails details) throws IOException { + return getBean(details).getParseData(details); + } + + public ParseText getParseText(HitDetails details) throws IOException { + return getBean(details).getParseText(details); + } + + public void close() throws IOException { + executor.shutdown(); + pingService.shutdown(); + for (SegmentBean bean : beans) { + bean.close(); + } + } + + public Summary getSummary(HitDetails details, Query query) + throws IOException { + return getBean(details).getSummary(details, query); + } + + @SuppressWarnings("unchecked") + public Summary[] getSummary(HitDetails[] detailsArr, Query query) + throws IOException { + List[] detailsList = new ArrayList[summaryTasks.size()]; + for (int i = 0; i < detailsList.length; i++) { + detailsList[i] = new ArrayList(); + } + for (HitDetails details : detailsArr) { + detailsList[segmentMap.get(details.getValue("segment"))].add(details); + } + for (int i = 0; i < summaryTasks.size(); i++) { + DistSummmaryTask task = (DistSummmaryTask)summaryTasks.get(i); + if (detailsList[i].size() > 0) { + HitDetails[] taskDetails = + detailsList[i].toArray(new HitDetails[detailsList[i].size()]); + task.setSummaryArgs(taskDetails, query); + } else { + task.setSummaryArgs(null, null); + } + } + + List> summaries; + try { + summaries = + executor.invokeAll(summaryTasks, timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + List summaryList = new ArrayList(); + for (Future f : summaries) { + Summary[] summaryArray; + try { + summaryArray = f.get(); + if (summaryArray == null) { + continue; + } + for (Summary summary : summaryArray) { + summaryList.add(summary); + } + } catch (Exception e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new RuntimeException(e); + } + } + + return summaryList.toArray(new Summary[summaryList.size()]); + } + +} Index: src/java/org/apache/nutch/searcher/FetchedSegments.java =================================================================== --- src/java/org/apache/nutch/searcher/FetchedSegments.java (revision 681741) +++ src/java/org/apache/nutch/searcher/FetchedSegments.java (working copy) @@ -19,8 +19,16 @@ import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.*; @@ -35,7 +43,75 @@ /** Implements {@link HitSummarizer} and {@link HitContent} for a set of * fetched segments. */ -public class FetchedSegments implements HitSummarizer, HitContent { +public class FetchedSegments implements RPCSegmentBean { + + public static final long VERSION = 1L; + + private static final ExecutorService executor = + Executors.newCachedThreadPool(); + + private class SummaryTask implements Callable { + private HitDetails details; + private Query query; + + public SummaryTask(HitDetails details, Query query) { + this.details = details; + this.query = query; + } + + public Summary call() throws Exception { + return getSummary(details, query); + } + } + + private class SegmentUpdater extends Thread { + + @Override + public void run() { + while (true) { + try { + FileStatus[] fstats = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] segmentDirs = HadoopFSUtil.getPaths(fstats); + Iterator> i = + segments.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + Segment seg = entry.getValue(); + if (!fs.exists(seg.segmentDir)) { + try { + seg.close(); + } catch (Exception e) { + /* A segment may fail to close + * since it may already be deleted from + * file system. So we just ignore the + * exception and remove the mapping from + * 'segments'. + */ + } finally { + i.remove(); + } + } + } + + if (segmentDirs != null) { + for (Path segmentDir : segmentDirs) { + segments.putIfAbsent(segmentDir.getName(), + new Segment(fs, segmentDir, conf)); + } + } + + Thread.sleep(60000); + } catch (InterruptedException e) { + // ignore + } catch (IOException e) { + // ignore + } + } + } + + } + private static class Segment implements Closeable { @@ -87,7 +163,7 @@ } return (ParseText)getEntry(parseText, url, new ParseText()); } - + private MapFile.Reader[] getReaders(String subDir) throws IOException { return MapFileOutputFormat.getReaders(fs, new Path(segmentDir, subDir), this.conf); } @@ -112,27 +188,33 @@ } - private HashMap segments = new HashMap(); + private ConcurrentMap segments = + new ConcurrentHashMap(); + private FileSystem fs; + private Configuration conf; + private Path segmentsDir; + private SegmentUpdater segUpdater; private Summarizer summarizer; - + /** Construct given a directory containing fetcher output. */ - public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException { - FileStatus[] fstats = fs.listStatus(new Path(segmentsDir), + public FetchedSegments(Configuration conf, Path segmentsDir) + throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + FileStatus[] fstats = fs.listStatus(segmentsDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); Path[] segmentDirs = HadoopFSUtil.getPaths(fstats); - this.summarizer = new SummarizerFactory(conf).getSummarizer(); + this.summarizer = new SummarizerFactory(this.conf).getSummarizer(); + this.segmentsDir = segmentsDir; + this.segUpdater = new SegmentUpdater(); if (segmentDirs != null) { - for (int i = 0; i < segmentDirs.length; i++) { - Path segmentDir = segmentDirs[i]; -// Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME); -// if (fs.exists(indexdone) && fs.isFile(indexdone)) { -// segments.put(segmentDir.getName(), new Segment(fs, segmentDir)); -// } - segments.put(segmentDir.getName(), new Segment(fs, segmentDir, conf)); - - } + for (Path segmentDir : segmentDirs) { + segments.put(segmentDir.getName(), + new Segment(this.fs, segmentDir, this.conf)); + } } + this.segUpdater.start(); } public String[] getSegmentNames() { @@ -167,51 +249,41 @@ return this.summarizer.getSummary(text, query); } - - private class SummaryThread extends Thread { - private HitDetails details; - private Query query; - - private Summary summary; - private Throwable throwable; - - public SummaryThread(HitDetails details, Query query) { - this.details = details; - this.query = query; - } - - public void run() { - try { - this.summary = getSummary(details, query); - } catch (Throwable throwable) { - this.throwable = throwable; - } - } - + + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return VERSION; } - public Summary[] getSummary(HitDetails[] details, Query query) throws IOException { - SummaryThread[] threads = new SummaryThread[details.length]; - for (int i = 0; i < threads.length; i++) { - threads[i] = new SummaryThread(details[i], query); - threads[i].start(); + List> tasks = + new ArrayList>(details.length); + for (int i = 0; i < details.length; i++) { + tasks.add(new SummaryTask(details[i], query)); + } + + List> summaries; + try { + summaries = executor.invokeAll(tasks); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + Summary[] results = new Summary[details.length]; - for (int i = 0; i < threads.length; i++) { + for (int i = 0; i < details.length; i++) { + Future f = summaries.get(i); + Summary summary; try { - threads[i].join(); - } catch (InterruptedException e) { + summary = f.get(); + } catch (Exception e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } throw new RuntimeException(e); } - if (threads[i].throwable instanceof IOException) { - throw (IOException)threads[i].throwable; - } else if (threads[i].throwable != null) { - throw new RuntimeException(threads[i].throwable); - } - results[i] = threads[i].summary; + results[i] = summary; } return results; } @@ -218,7 +290,7 @@ private Segment getSegment(HitDetails details) { - return (Segment)segments.get(details.getValue("segment")); + return segments.get(details.getValue("segment")); } private Text getUrl(HitDetails details) { @@ -230,9 +302,9 @@ } public void close() throws IOException { - Iterator iterator = segments.values().iterator(); + Iterator iterator = segments.values().iterator(); while (iterator.hasNext()) { - ((Segment) iterator.next()).close(); + iterator.next().close(); } } Index: src/java/org/apache/nutch/searcher/Hit.java =================================================================== --- src/java/org/apache/nutch/searcher/Hit.java (revision 681741) +++ src/java/org/apache/nutch/searcher/Hit.java (working copy) @@ -21,6 +21,7 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -25,10 +26,11 @@ import org.apache.hadoop.io.WritableComparable; /** A document which matched a query in an index. */ -public class Hit implements Writable, Comparable { +@SuppressWarnings("unchecked") +public class Hit implements Writable, Comparable { private int indexNo; // index id - private int indexDocNo; // index-relative id + private String uniqueKey; private WritableComparable sortValue; // value sorted on private String dedupValue; // value to dedup on private boolean moreFromDupExcluded; @@ -35,17 +37,17 @@ public Hit() {} - public Hit(int indexNo, int indexDocNo) { - this(indexNo, indexDocNo, null, null); + public Hit(int indexNo, String uniqueKey) { + this(indexNo, uniqueKey, null, null); } - public Hit(int indexNo, int indexDocNo, + public Hit(int indexNo, String uniqueKey, WritableComparable sortValue, String dedupValue) { - this(indexDocNo, sortValue, dedupValue); + this(uniqueKey, sortValue, dedupValue); this.indexNo = indexNo; } - public Hit(int indexDocNo, WritableComparable sortValue, String dedupValue) { - this.indexDocNo = indexDocNo; + public Hit(String uniqueKey, WritableComparable sortValue, String dedupValue) { + this.uniqueKey = uniqueKey; this.sortValue = sortValue; this.dedupValue = dedupValue == null ? "" : dedupValue; } @@ -55,7 +57,7 @@ public void setIndexNo(int indexNo) { this.indexNo = indexNo; } /** Return the document number of this hit within an index. */ - public int getIndexDocNo() { return indexDocNo; } + public String getUniqueKey() { return uniqueKey; } /** Return the value of the field that hits are sorted on. */ public WritableComparable getSortValue() { return sortValue; } @@ -73,23 +75,10 @@ /** Display as a string. */ public String toString() { - return "#" + indexDocNo; - } - - public boolean equals(Object o) { - if (!(o instanceof Hit)) - return false; - Hit other = (Hit)o; - return this.indexNo == other.indexNo - && this.indexDocNo == other.indexDocNo; - } - - public int hashCode() { - return indexNo ^ indexDocNo; + return "#" + uniqueKey; } - public int compareTo(Object o) { - Hit other = (Hit)o; + public int compareTo(Hit other) { int compare = sortValue.compareTo(other.sortValue); if (compare != 0) { return compare; // use sortValue @@ -96,7 +85,7 @@ } else if (other.indexNo != this.indexNo) { return other.indexNo - this.indexNo; // prefer later indexes } else { - return other.indexDocNo - this.indexDocNo; // prefer later docs + return other.uniqueKey.compareTo(this.uniqueKey); // prefer later doc } } @@ -101,11 +90,11 @@ } public void write(DataOutput out) throws IOException { - out.writeInt(indexDocNo); + Text.writeString(out, uniqueKey); } public void readFields(DataInput in) throws IOException { - indexDocNo = in.readInt(); + uniqueKey = Text.readString(in); } } Index: src/java/org/apache/nutch/searcher/HitDetails.java =================================================================== --- src/java/org/apache/nutch/searcher/HitDetails.java (revision 681741) +++ src/java/org/apache/nutch/searcher/HitDetails.java (working copy) @@ -73,7 +73,7 @@ /** Returns all the values with the specified name. */ public String[] getValues(String field) { - ArrayList vals = new ArrayList(); + ArrayList vals = new ArrayList(); for (int i=0; i 0) - ? (String[]) vals.toArray(new String[vals.size()]) + ? vals.toArray(new String[vals.size()]) : null; } Index: src/java/org/apache/nutch/searcher/Hits.java =================================================================== --- src/java/org/apache/nutch/searcher/Hits.java (revision 681741) +++ src/java/org/apache/nutch/searcher/Hits.java (working copy) @@ -65,7 +65,6 @@ return results; } - public void write(DataOutput out) throws IOException { out.writeLong(total); // write total hits out.writeInt(top.length); // write hits returned @@ -74,7 +73,7 @@ for (int i = 0; i < top.length; i++) { Hit h = top[i]; - out.writeInt(h.getIndexDocNo()); // write indexDocNo + Text.writeString(out, h.getUniqueKey()); // write uniqueKey h.getSortValue().write(out); // write sortValue Text.writeString(out, h.getDedupValue()); // write dedupValue } @@ -80,6 +79,7 @@ } } + @SuppressWarnings("unchecked") public void readFields(DataInput in) throws IOException { total = in.readLong(); // read total hits top = new Hit[in.readInt()]; // read hits returned @@ -93,7 +93,7 @@ } for (int i = 0; i < top.length; i++) { - int indexDocNo = in.readInt(); // read indexDocNo + String uniqueKey = Text.readString(in); // read uniqueKey WritableComparable sortValue = null; try { @@ -105,7 +105,7 @@ String dedupValue = Text.readString(in); // read dedupValue - top[i] = new Hit(indexDocNo, sortValue, dedupValue); + top[i] = new Hit(uniqueKey, sortValue, dedupValue); } } Index: src/java/org/apache/nutch/searcher/IndexSearcher.java =================================================================== --- src/java/org/apache/nutch/searcher/IndexSearcher.java (revision 681741) +++ src/java/org/apache/nutch/searcher/IndexSearcher.java (working copy) @@ -103,7 +103,7 @@ public String getExplanation(Query query, Hit hit) throws IOException { return luceneSearcher.explain(this.queryFilters.filter(query), - hit.getIndexDocNo()).toHtml(); + Integer.valueOf(hit.getUniqueKey())).toHtml(); } public HitDetails getDetails(Hit hit) throws IOException { @@ -108,7 +108,7 @@ public HitDetails getDetails(Hit hit) throws IOException { - Document doc = luceneSearcher.doc(hit.getIndexDocNo()); + Document doc = luceneSearcher.doc(Integer.valueOf(hit.getUniqueKey())); List docFields = doc.getFields(); String[] fields = new String[docFields.size()]; @@ -162,7 +162,7 @@ String dedupValue = dedupValues == null ? null : dedupValues[doc]; - hits[i] = new Hit(doc, sortValue, dedupValue); + hits[i] = new Hit(Integer.toString(doc), sortValue, dedupValue); } return new Hits(topDocs.totalHits, hits); } Index: src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java =================================================================== --- src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (revision 681741) +++ src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (working copy) @@ -18,7 +18,6 @@ package org.apache.nutch.searcher; import org.apache.lucene.search.Searcher; -import org.apache.lucene.search.QueryFilter; import org.apache.lucene.search.*; import org.apache.lucene.index.Term; import org.apache.lucene.misc.ChainedFilter; @@ -83,6 +82,7 @@ } + @SuppressWarnings("serial") private static class TimeExceeded extends RuntimeException { public long maxTime; private int maxDoc; @@ -127,7 +127,8 @@ } } - private static class LimitExceeded extends RuntimeException { + @SuppressWarnings("serial") +private static class LimitExceeded extends RuntimeException { private int maxDoc; public LimitExceeded(int maxDoc) { this.maxDoc = maxDoc; } } @@ -151,7 +152,8 @@ * @param threshold * the fraction of documents which must contain a term */ - public LuceneQueryOptimizer(Configuration conf) { + @SuppressWarnings("serial") +public LuceneQueryOptimizer(Configuration conf) { final int cacheSize = conf.getInt("searcher.filter.cache.size", 16); this.threshold = conf.getFloat("searcher.filter.cache.threshold", 0.05f); @@ -157,7 +159,7 @@ 0.05f); this.searcherMaxHits = conf.getInt("searcher.max.hits", -1); this.cache = new LinkedHashMap(cacheSize, 0.75f, true) { - protected boolean removeEldestEntry(Map.Entry eldest) { + protected boolean removeEldestEntry(Map.Entry eldest) { return size() > cacheSize; // limit size of cache } }; Index: src/java/org/apache/nutch/searcher/LuceneSearchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/LuceneSearchBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/LuceneSearchBean.java (revision 0) @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nutch.indexer.Indexer; +import org.apache.nutch.util.HadoopFSUtil; + +public class LuceneSearchBean implements RPCSearchBean { + + public static final long VERSION = 1L; + + private IndexSearcher searcher; + + private FileSystem fs; + + private Configuration conf; + + /** + * Construct in a named directory. + * @param conf + * @param dir + * @throws IOException + */ + public LuceneSearchBean(Configuration conf, Path indexDir, Path indexesDir) + throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + init(indexDir, indexesDir); + } + + private void init(Path indexDir, Path indexesDir) + throws IOException { + if (this.fs.exists(indexDir)) { + LOG.info("opening merged index in " + indexDir); + this.searcher = new IndexSearcher(indexDir, this.conf); + } else { + LOG.info("opening indexes in " + indexesDir); + + List vDirs = new ArrayList(); + FileStatus[] fstats = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] directories = HadoopFSUtil.getPaths(fstats); + for(int i = 0; i < directories.length; i++) { + Path indexdone = new Path(directories[i], Indexer.DONE_NAME); + if(fs.isFile(indexdone)) { + vDirs.add(directories[i]); + } + } + + directories = new Path[ vDirs.size() ]; + for(int i = 0; vDirs.size()>0; i++) { + directories[i] = vDirs.remove(0); + } + + this.searcher = new IndexSearcher(directories, this.conf); + } + } + + public Hits search(Query query, int numHits, String dedupField, + String sortField, boolean reverse) + throws IOException { + return searcher.search(query, numHits, dedupField, sortField, reverse); + } + + public String getExplanation(Query query, Hit hit) throws IOException { + return searcher.getExplanation(query, hit); + } + + public HitDetails getDetails(Hit hit) throws IOException { + return searcher.getDetails(hit); + } + + public HitDetails[] getDetails(Hit[] hits) throws IOException { + return searcher.getDetails(hits); + } + + public boolean ping() throws IOException { + return true; + } + + public void close() throws IOException { + if (searcher != null) { searcher.close(); } + if (fs != null) { fs.close(); } + } + + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return VERSION; + } + +} Index: src/java/org/apache/nutch/searcher/NutchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/NutchBean.java (revision 681741) +++ src/java/org/apache/nutch/searcher/NutchBean.java (working copy) @@ -18,7 +18,9 @@ package org.apache.nutch.searcher; import java.io.*; +import java.net.InetSocketAddress; import java.util.*; + import javax.servlet.*; import org.apache.commons.logging.Log; @@ -25,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Closeable; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.StringUtils; @@ -29,9 +32,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.util.StringUtils; import org.apache.nutch.parse.*; -import org.apache.nutch.indexer.*; import org.apache.nutch.crawl.Inlinks; -import org.apache.nutch.util.HadoopFSUtil; import org.apache.nutch.util.NutchConfiguration; /** @@ -39,8 +40,7 @@ * @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $ */ public class NutchBean - implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, - DistributedSearch.Protocol, Closeable { +implements SearchBean, SegmentBean, HitInlinks, Closeable { public static final Log LOG = LogFactory.getLog(NutchBean.class); public static final String KEY = "nutchBean"; @@ -51,10 +51,8 @@ private String[] segmentNames; - private Searcher searcher; - private HitDetailer detailer; - private HitSummarizer summarizer; - private HitContent content; + private SearchBean searchBean; + private SegmentBean segmentBean; private HitInlinks linkDb; @@ -84,7 +82,8 @@ } /** - * Construct in a named directory. + * Construct in a named directory. + * * @param conf * @param dir * @throws IOException @@ -90,83 +89,67 @@ * @throws IOException */ public NutchBean(Configuration conf, Path dir) throws IOException { - this.conf = conf; - this.fs = FileSystem.get(this.conf); - if (dir == null) { - dir = new Path(this.conf.get("searcher.dir", "crawl")); - } - Path servers = new Path(dir, "search-servers.txt"); - if (fs.exists(servers)) { - if (LOG.isInfoEnabled()) { - LOG.info("searching servers in " + servers); - } - init(new DistributedSearch.Client(servers, conf)); - } else { - init(new Path(dir, "index"), new Path(dir, "indexes"), new Path( - dir, "segments"), new Path(dir, "linkdb")); - } + this.conf = conf; + this.fs = FileSystem.get(this.conf); + if (dir == null) { + dir = new Path(this.conf.get("searcher.dir", "crawl")); + } + Path luceneConfig = new Path(dir, "search-servers.txt"); + Path solrConfig = new Path(dir, "solr-servers.txt"); + Path segmentConfig = new Path(dir, "segment-servers.txt"); + + if (fs.exists(luceneConfig) || fs.exists(solrConfig)) { + searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig); + } else { + Path indexDir = new Path(dir, "index"); + Path indexesDir = new Path(dir, "indexes"); + searchBean = new LuceneSearchBean(conf, indexDir, indexesDir); } - private void init(Path indexDir, Path indexesDir, Path segmentsDir, - Path linkDb) - throws IOException { - IndexSearcher indexSearcher; - if (this.fs.exists(indexDir)) { - if (LOG.isInfoEnabled()) { - LOG.info("opening merged index in " + indexDir); - } - indexSearcher = new IndexSearcher(indexDir, this.conf); + if (fs.exists(segmentConfig)) { + segmentBean = new DistributedSegmentBean(conf, segmentConfig); + } else if (fs.exists(luceneConfig)) { + segmentBean = new DistributedSegmentBean(conf, luceneConfig); } else { - if (LOG.isInfoEnabled()) { - LOG.info("opening indexes in " + indexesDir); - } - - Vector vDirs=new Vector(); - FileStatus[] fstats = fs.listStatus(indexesDir, - HadoopFSUtil.getPassDirectoriesFilter(fs)); - Path [] directories = HadoopFSUtil.getPaths(fstats); - for(int i = 0; i < directories.length; i++) { - Path indexdone = new Path(directories[i], Indexer.DONE_NAME); - if(fs.isFile(indexdone)) { - vDirs.add(directories[i]); + segmentBean = new FetchedSegments(conf, new Path(dir, "segments")); + } + + linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf); + } + + public static List readAddresses(Path path, + Configuration conf) throws IOException { + List addrs = new ArrayList(); + for (String line : readConfig(path, conf)) { + StringTokenizer tokens = new StringTokenizer(line); + if (tokens.hasMoreTokens()) { + String host = tokens.nextToken(); + if (tokens.hasMoreTokens()) { + String port = tokens.nextToken(); + addrs.add(new InetSocketAddress(host, Integer.parseInt(port))); } } - - - directories = new Path[ vDirs.size() ]; - for(int i = 0; vDirs.size()>0; i++) { - directories[i]=(Path)vDirs.remove(0); - } - - indexSearcher = new IndexSearcher(directories, this.conf); } + return addrs; + } - if (LOG.isInfoEnabled()) { - LOG.info("opening segments in " + segmentsDir); + public static List readConfig(Path path, Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + BufferedReader reader = + new BufferedReader(new InputStreamReader(fs.open(path))); + try { + ArrayList addrs = new ArrayList(); + String line; + while ((line = reader.readLine()) != null) { + addrs.add(line); + } + return addrs; + } finally { + reader.close(); } - FetchedSegments segments = new FetchedSegments(this.fs, segmentsDir.toString(),this.conf); - - this.segmentNames = segments.getSegmentNames(); - - this.searcher = indexSearcher; - this.detailer = indexSearcher; - this.summarizer = segments; - this.content = segments; - - if (LOG.isInfoEnabled()) { LOG.info("opening linkdb in " + linkDb); } - this.linkDb = new LinkDbInlinks(fs, linkDb, this.conf); - } - - private void init(DistributedSearch.Client client) { - this.segmentNames = client.getSegmentNames(); - this.searcher = client; - this.detailer = client; - this.summarizer = client; - this.content = client; - this.linkDb = client; } - public String[] getSegmentNames() { return segmentNames; } @@ -179,10 +162,11 @@ String dedupField, String sortField, boolean reverse) throws IOException { - return searcher.search(query, numHits, dedupField, sortField, reverse); + return searchBean.search(query, numHits, dedupField, sortField, reverse); } - private class DupHits extends ArrayList { + @SuppressWarnings("serial") + private class DupHits extends ArrayList { private boolean maxSizeExceeded; } @@ -248,13 +232,13 @@ if (LOG.isInfoEnabled()) { LOG.info("searching for "+numHitsRaw+" raw hits"); } - Hits hits = searcher.search(query, numHitsRaw, + Hits hits = searchBean.search(query, numHitsRaw, dedupField, sortField, reverse); long total = hits.getTotal(); - Map dupToHits = new HashMap(); - List resultList = new ArrayList(); - Set seen = new HashSet(); - List excludedValues = new ArrayList(); + Map dupToHits = new HashMap(); + List resultList = new ArrayList(); + Set seen = new HashSet(); + List excludedValues = new ArrayList(); boolean totalIsExact = true; for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) { // get the next raw hit @@ -264,7 +248,7 @@ for (int i = 0; i < excludedValues.size(); i++) { if (i == MAX_PROHIBITED_TERMS) break; - optQuery.addProhibitedTerm(((String)excludedValues.get(i)), + optQuery.addProhibitedTerm(excludedValues.get(i), dedupField); } numHitsRaw = (int)(numHitsRaw * rawHitsFactor); @@ -271,7 +255,7 @@ if (LOG.isInfoEnabled()) { LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery); } - hits = searcher.search(optQuery, numHitsRaw, + hits = searchBean.search(optQuery, numHitsRaw, dedupField, sortField, reverse); if (LOG.isInfoEnabled()) { LOG.info("found "+hits.getTotal()+" raw hits"); @@ -287,7 +271,7 @@ // get dup hits for its value String value = hit.getDedupValue(); - DupHits dupHits = (DupHits)dupToHits.get(value); + DupHits dupHits = dupToHits.get(value); if (dupHits == null) dupToHits.put(value, dupHits = new DupHits()); @@ -297,7 +281,7 @@ // mark prior hits with moreFromDupExcluded for (int i = 0; i < dupHits.size(); i++) { - ((Hit)dupHits.get(i)).setMoreFromDupExcluded(true); + dupHits.get(i).setMoreFromDupExcluded(true); } dupHits.maxSizeExceeded = true; @@ -318,7 +302,7 @@ Hits results = new Hits(total, - (Hit[])resultList.toArray(new Hit[resultList.size()])); + resultList.toArray(new Hit[resultList.size()])); results.setTotalIsExact(totalIsExact); return results; } @@ -325,19 +309,19 @@ public String getExplanation(Query query, Hit hit) throws IOException { - return searcher.getExplanation(query, hit); + return searchBean.getExplanation(query, hit); } public HitDetails getDetails(Hit hit) throws IOException { - return detailer.getDetails(hit); + return searchBean.getDetails(hit); } public HitDetails[] getDetails(Hit[] hits) throws IOException { - return detailer.getDetails(hits); + return searchBean.getDetails(hits); } public Summary getSummary(HitDetails hit, Query query) throws IOException { - return summarizer.getSummary(hit, query); + return segmentBean.getSummary(hit, query); } public Summary[] getSummary(HitDetails[] hits, Query query) @@ -342,19 +326,19 @@ public Summary[] getSummary(HitDetails[] hits, Query query) throws IOException { - return summarizer.getSummary(hits, query); + return segmentBean.getSummary(hits, query); } public byte[] getContent(HitDetails hit) throws IOException { - return content.getContent(hit); + return segmentBean.getContent(hit); } public ParseData getParseData(HitDetails hit) throws IOException { - return content.getParseData(hit); + return segmentBean.getParseData(hit); } public ParseText getParseText(HitDetails hit) throws IOException { - return content.getParseText(hit); + return segmentBean.getParseText(hit); } public String[] getAnchors(HitDetails hit) throws IOException { @@ -366,12 +350,12 @@ } public long getFetchDate(HitDetails hit) throws IOException { - return content.getFetchDate(hit); + return segmentBean.getFetchDate(hit); } public void close() throws IOException { - if (content != null) { content.close(); } - if (searcher != null) { searcher.close(); } + if (searchBean != null) { searchBean.close(); } + if (segmentBean != null) { segmentBean.close(); } if (linkDb != null) { linkDb.close(); } if (fs != null) { fs.close(); } } @@ -376,6 +360,10 @@ if (fs != null) { fs.close(); } } + public boolean ping() { + return true; + } + /** For debugging. */ public static void main(String[] args) throws Exception { String usage = "NutchBean query"; @@ -396,13 +384,22 @@ Summary[] summaries = bean.getSummary(details, query); for (int i = 0; i < hits.getLength(); i++) { - System.out.println(" "+i+" "+ details[i] + "\n" + summaries[i]); + System.out.println(" " + i + " " + details[i] + "\n" + summaries[i]); } } - public long getProtocolVersion(String className, long arg1) throws IOException { - if(DistributedSearch.Protocol.class.getName().equals(className)){ - return 1; + public long getProtocolVersion(String className, long clientVersion) + throws IOException { + if(RPCSearchBean.class.getName().equals(className) && + searchBean instanceof RPCSearchBean) { + + RPCSearchBean rpcBean = (RPCSearchBean)searchBean; + return rpcBean.getProtocolVersion(className, clientVersion); + } else if (SegmentBean.class.getName().equals(className) && + segmentBean instanceof RPCSegmentBean) { + + RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean; + return rpcBean.getProtocolVersion(className, clientVersion); } else { throw new IOException("Unknown Protocol classname:" + className); } Index: src/java/org/apache/nutch/searcher/OpenSearchServlet.java =================================================================== --- src/java/org/apache/nutch/searcher/OpenSearchServlet.java (revision 681741) +++ src/java/org/apache/nutch/searcher/OpenSearchServlet.java (working copy) @@ -43,8 +43,10 @@ /** Present search results using A9's OpenSearch extensions to RSS, plus a few * Nutch-specific extensions. */ +@SuppressWarnings("serial") public class OpenSearchServlet extends HttpServlet { - private static final Map NS_MAP = new HashMap(); + private static final Map NS_MAP = + new HashMap(); private int MAX_HITS_PER_PAGE; static { @@ -52,7 +54,7 @@ NS_MAP.put("nutch", "http://www.nutch.org/opensearchrss/1.0/"); } - private static final Set SKIP_DETAILS = new HashSet(); + private static final Set SKIP_DETAILS = new HashSet(); static { SKIP_DETAILS.add("url"); // redundant with RSS link SKIP_DETAILS.add("title"); // redundant with RSS title @@ -171,8 +173,8 @@ Element rss = addNode(doc, doc, "rss"); addAttribute(doc, rss, "version", "2.0"); addAttribute(doc, rss, "xmlns:opensearch", - (String)NS_MAP.get("opensearch")); - addAttribute(doc, rss, "xmlns:nutch", (String)NS_MAP.get("nutch")); + NS_MAP.get("opensearch")); + addAttribute(doc, rss, "xmlns:nutch", NS_MAP.get("nutch")); Element channel = addNode(doc, rss, "channel"); @@ -214,7 +216,7 @@ HitDetails detail = details[i]; String title = detail.getValue("title"); String url = detail.getValue("url"); - String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo(); + String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey(); if (title == null || title.equals("")) { // use url for docs w/o title title = url; @@ -283,7 +285,7 @@ private static void addNode(Document doc, Node parent, String ns, String name, String text) { - Element child = doc.createElementNS((String)NS_MAP.get(ns), ns+":"+name); + Element child = doc.createElementNS(NS_MAP.get(ns), ns+":"+name); child.appendChild(doc.createTextNode(getLegalXml(text))); parent.appendChild(child); } Index: src/java/org/apache/nutch/searcher/Query.java =================================================================== --- src/java/org/apache/nutch/searcher/Query.java (revision 681741) +++ src/java/org/apache/nutch/searcher/Query.java (working copy) @@ -282,7 +282,7 @@ } - private ArrayList clauses = new ArrayList(); + private ArrayList clauses = new ArrayList(); private Configuration conf; @@ -305,7 +305,7 @@ /** Return all clauses. */ public Clause[] getClauses() { - return (Clause[])clauses.toArray(CLAUSES_PROTO); + return clauses.toArray(CLAUSES_PROTO); } /** Add a required term in the default field. */ @@ -361,7 +361,7 @@ public void write(DataOutput out) throws IOException { out.writeByte(clauses.size()); for (int i = 0; i < clauses.size(); i++) - ((Clause)clauses.get(i)).write(out); + clauses.get(i).write(out); } public static Query read(DataInput in, Configuration conf) throws IOException { @@ -404,7 +404,7 @@ } catch (CloneNotSupportedException e) { throw new RuntimeException(e); } - clone.clauses = (ArrayList)clauses.clone(); + clone.clauses = (ArrayList)clauses.clone(); return clone; } @@ -412,9 +412,9 @@ /** Flattens a query into the set of text terms that it contains. These are * terms which should be higlighted in matching documents. */ public String[] getTerms() { - ArrayList result = new ArrayList(); + ArrayList result = new ArrayList(); for (int i = 0; i < clauses.size(); i++) { - Clause clause = (Clause)clauses.get(i); + Clause clause = clauses.get(i); if (!clause.isProhibited()) { if (clause.isPhrase()) { Term[] terms = clause.getPhrase().getTerms(); @@ -426,7 +426,7 @@ } } } - return (String[])result.toArray(new String[result.size()]); + return result.toArray(new String[result.size()]); } /** @@ -457,7 +457,7 @@ for (int i = 0; i < clauses.length; i++) { Clause c = clauses[i]; if (!new QueryFilters(conf).isField(c.getField())) { // unknown field - ArrayList terms = new ArrayList(); // add name to query + ArrayList terms = new ArrayList(); // add name to query if (c.isPhrase()) { terms.addAll(Arrays.asList(c.getPhrase().getTerms())); } else { @@ -467,7 +467,7 @@ c = (Clause)c.clone(); c.field = Clause.DEFAULT_FIELD; // use default field instead c.termOrPhrase - = new Phrase((Term[])terms.toArray(new Term[terms.size()])); + = new Phrase(terms.toArray(new Term[terms.size()])); } output.clauses.add(c); // copy clause to output } Index: src/java/org/apache/nutch/searcher/QueryException.java =================================================================== --- src/java/org/apache/nutch/searcher/QueryException.java (revision 681741) +++ src/java/org/apache/nutch/searcher/QueryException.java (working copy) @@ -17,6 +17,7 @@ package org.apache.nutch.searcher; +@SuppressWarnings("serial") public class QueryException extends java.io.IOException { public QueryException(String message) { super(message); Index: src/java/org/apache/nutch/searcher/QueryFilters.java =================================================================== --- src/java/org/apache/nutch/searcher/QueryFilters.java (revision 681741) +++ src/java/org/apache/nutch/searcher/QueryFilters.java (working copy) @@ -40,14 +40,14 @@ private static final Log LOG = LogFactory.getLog(QueryFilters.class); private QueryFilter[] queryFilters; - private HashSet FIELD_NAMES ; - private HashSet RAW_FIELD_NAMES; + private HashSet FIELD_NAMES ; + private HashSet RAW_FIELD_NAMES; - private static ArrayList parseFieldNames(Extension extension, + private static List parseFieldNames(Extension extension, String attribute) { String fields = extension.getAttribute(attribute); if (fields == null) fields = ""; - return Collections.list(new StringTokenizer(fields, " ,\t\n\r")); + return Arrays.asList(fields.split("[,\\s]")); } public QueryFilters(Configuration conf) { @@ -61,13 +61,14 @@ if (point == null) throw new RuntimeException(QueryFilter.X_POINT_ID + " not found."); Extension[] extensions = point.getExtensions(); - FIELD_NAMES = new HashSet(); - RAW_FIELD_NAMES = new HashSet(); + FIELD_NAMES = new HashSet(); + RAW_FIELD_NAMES = new HashSet(); QueryFilter[] filters = new QueryFilter[extensions.length]; for (int i = 0; i < extensions.length; i++) { Extension extension = extensions[i]; - ArrayList fieldNames = parseFieldNames(extension, "fields"); - ArrayList rawFieldNames = parseFieldNames(extension, "raw-fields"); + List fieldNames = parseFieldNames(extension, "fields"); + List rawFieldNames = + parseFieldNames(extension, "raw-fields"); if (fieldNames.size() == 0 && rawFieldNames.size() == 0) { if (LOG.isWarnEnabled()) { LOG.warn("QueryFilter: " + extension.getId() @@ -90,8 +91,8 @@ .getName()); } else { // cache already filled - FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES"); - RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES"); + FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES"); + RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES"); } } Index: src/java/org/apache/nutch/searcher/RPCSearchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/RPCSearchBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/RPCSearchBean.java (revision 0) @@ -0,0 +1,7 @@ +package org.apache.nutch.searcher; + +import org.apache.hadoop.ipc.VersionedProtocol; + +public interface RPCSearchBean extends SearchBean, VersionedProtocol { + +} Index: src/java/org/apache/nutch/searcher/RPCSegmentBean.java =================================================================== --- src/java/org/apache/nutch/searcher/RPCSegmentBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/RPCSegmentBean.java (revision 0) @@ -0,0 +1,7 @@ +package org.apache.nutch.searcher; + +import org.apache.hadoop.ipc.VersionedProtocol; + +public interface RPCSegmentBean extends SegmentBean, VersionedProtocol { + +} Index: src/java/org/apache/nutch/searcher/SearchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/SearchBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/SearchBean.java (revision 0) @@ -0,0 +1,12 @@ +package org.apache.nutch.searcher; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public interface SearchBean extends Searcher, HitDetailer { + public static final Log LOG = LogFactory.getLog(SearchBean.class); + + public boolean ping() throws IOException ; +} Index: src/java/org/apache/nutch/searcher/SegmentBean.java =================================================================== --- src/java/org/apache/nutch/searcher/SegmentBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/SegmentBean.java (revision 0) @@ -0,0 +1,8 @@ +package org.apache.nutch.searcher; + +import java.io.IOException; + +public interface SegmentBean extends HitContent, HitSummarizer { + + public String[] getSegmentNames() throws IOException; +} Index: src/java/org/apache/nutch/searcher/SolrSearchBean.java =================================================================== --- src/java/org/apache/nutch/searcher/SolrSearchBean.java (revision 0) +++ src/java/org/apache/nutch/searcher/SolrSearchBean.java (revision 0) @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.parsers.SAXParserFactory; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.ToStringUtils; +import org.apache.nutch.util.solr.SolrClient; +import org.apache.nutch.util.solr.SolrResponseHandler; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; +import org.xml.sax.XMLReader; + +public class SolrSearchBean implements SearchBean { + + public static final Log LOG = LogFactory.getLog(SolrSearchBean.class); + + private static final Text MIN_TEXT_WRITABLE = + new Text(""); + + private static final IntWritable MIN_INT_WRITABLE = + new IntWritable(Integer.MIN_VALUE); + + private static final FloatWritable MIN_FLOAT_WRITABLE = + new FloatWritable(Float.MIN_VALUE); + + /* We assume that an array is always an array of strings. */ + public static enum FieldType { STR, INT, LONG, FLOAT, + DOUBLE, BOOL, DATE, ARR }; + + private SolrClient client; + + private XMLReader reader; + + private SolrResponseHandler handler; + + private QueryFilters filters; + + public SolrSearchBean(Configuration conf, String solrServer) + throws IOException { + client = new SolrClient(solrServer); + handler = new SolrResponseHandler(); + filters = new QueryFilters(conf); + try { + reader = SAXParserFactory.newInstance().newSAXParser().getXMLReader(); + reader.setContentHandler(handler); + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + } + + public String getExplanation(Query query, Hit hit) throws IOException { + return "SOLR backend does not support explanations yet."; + } + + public Hits search(Query query, int numHits, String dedupField, + String sortField, boolean reverse) + throws IOException { + + // filter query string + BooleanQuery bQuery = filters.filter(query); + + byte[] response; + if (sortField == null) { + response = client.search(stringify(bQuery), numHits, + new String[] { dedupField, "score", "id" }, sortField, reverse); + sortField = "score"; + } else { + response = client.search(stringify(bQuery), numHits, + new String[] { dedupField, sortField, "id" }, sortField, reverse); + } + + try { + reader.parse(new InputSource(new ByteArrayInputStream(response))); + } catch (SAXException e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + Hit[] hitArr = new Hit[handler.getNumTopResults()]; + for (int i = 0; i < hitArr.length; i++) { + Map result = handler.getResult(i); + FieldType sortFieldType = handler.getType(sortField); + if (sortFieldType == FieldType.ARR) { + throw new RuntimeException("Sort value shouldn't be an array"); + } + + String sortStr = (String) result.get(sortField); + WritableComparable sortValue; + if (sortFieldType == null) { + sortValue = MIN_FLOAT_WRITABLE; + } else { + switch (sortFieldType) { + case STR: + if (sortStr == null) { + sortValue = MIN_TEXT_WRITABLE; + } else { + sortValue = new Text(sortStr); + } + break; + case INT: + if (sortStr == null) { + sortValue = MIN_INT_WRITABLE; + } else { + sortValue = new IntWritable(Integer.valueOf(sortStr)); + } + break; + case FLOAT: + if (sortStr == null) { + sortValue = MIN_FLOAT_WRITABLE; + } else { + sortValue = new FloatWritable(Float.valueOf(sortStr)); + } + break; + case DATE: + // TODO + throw + new NotImplementedException("Sort on date is not implemented"); + default: + throw new RuntimeException("Unknown sort value type!"); + } + } + + FieldType dedupFieldType = handler.getType(dedupField); + if (dedupFieldType == FieldType.ARR) { + throw new RuntimeException("Dedup value shouldn't be an array"); + } + String dedupValue = (String) result.get(dedupField); + + String uniqueKey; + if (handler.getType("id") == FieldType.ARR) { + throw new RuntimeException("Unique key shouldn't be an array"); + } + uniqueKey = (String) result.get("id"); + if (uniqueKey == null) { + throw new RuntimeException("No value for unique key"); + } + + hitArr[i] = new Hit(uniqueKey, sortValue, dedupValue); + } + + return new Hits(handler.getNumResults(), hitArr); + } + + public void close() throws IOException { } + + public HitDetails getDetails(Hit hit) throws IOException { + byte[] response = client.search("id:\"" + hit.getUniqueKey() + "\"", 1, + null, null, false); + try { + reader.parse(new InputSource(new ByteArrayInputStream(response))); + } catch (SAXException e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + + if (handler.getNumResults() == 0) { + return null; + } + + return buildDetails(handler.getResult(0)); + } + + public HitDetails[] getDetails(Hit[] hits) throws IOException { + StringBuilder buf = new StringBuilder(); + buf.append("("); + for (Hit hit : hits) { + buf.append(" id:\""); + buf.append(hit.getUniqueKey()); + buf.append("\""); + } + buf.append(")"); + + byte[] response = + client.search(buf.toString(), hits.length, null, null, false); + try { + reader.parse(new InputSource(new ByteArrayInputStream(response))); + } catch (SAXException e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + + int numResults = handler.getNumTopResults(); + if (numResults < hits.length) { + throw new RuntimeException("Missing hit details! Found: " + numResults + + ", expecting: " + hits.length); + } + + /* Response returned from SOLR server may be out of + * order. So we make sure that nth element of HitDetails[] + * is the detail of nth hit. + */ + Map detailsMap = + new HashMap(hits.length); + for (int i = 0; i < numResults; i++) { + HitDetails details = buildDetails(handler.getResult(i)); + detailsMap.put(details.getValue("url"), details); + } + + HitDetails[] detailsArr = new HitDetails[hits.length]; + for (int i = 0; i < hits.length; i++) { + detailsArr[i] = detailsMap.get(hits[i].getUniqueKey()); + } + + return detailsArr; + } + + public boolean ping() throws IOException { + return client.ping(); + } + + private static HitDetails buildDetails(Map result) { + List fieldList = new ArrayList(); + List valueList = new ArrayList(); + + for (Map.Entry entry : result.entrySet()) { + String field = entry.getKey(); + Object o = entry.getValue(); + if (o.getClass().isArray()) { + String[] vals = (String[]) o; + for (String val : vals) { + fieldList.add(field); + valueList.add(val); + } + } else { + String val = (String) o; + fieldList.add(field); + valueList.add(val); + } + } + + String[] fields = fieldList.toArray(new String[fieldList.size()]); + String[] values = valueList.toArray(new String[valueList.size()]); + + return new HitDetails(fields, values); + } + + /* Hackish solution for stringifying queries. Code from BooleanQuery. + * This is necessary because a BooleanQuery.toString produces + * statements like feed:http://www.google.com which doesn't work, we + * need feed:"http://www.google.com". + */ + private static String stringify(BooleanQuery bQuery) { + StringBuilder buffer = new StringBuilder(); + boolean needParens=(bQuery.getBoost() != 1.0) || + (bQuery.getMinimumNumberShouldMatch()>0) ; + if (needParens) { + buffer.append("("); + } + + BooleanClause[] clauses = bQuery.getClauses(); + int i = 0; + for (BooleanClause c : clauses) { + if (c.isProhibited()) + buffer.append("-"); + else if (c.isRequired()) + buffer.append("+"); + + org.apache.lucene.search.Query subQuery = c.getQuery(); + if (subQuery instanceof BooleanQuery) { // wrap sub-bools in parens + buffer.append("("); + buffer.append(c.getQuery().toString("")); + buffer.append(")"); + } else if (subQuery instanceof TermQuery) { + Term term = ((TermQuery) subQuery).getTerm(); + buffer.append(term.field()); + buffer.append(":\""); + buffer.append(term.text()); + buffer.append("\""); + } else { + buffer.append(" "); + buffer.append(c.getQuery().toString()); + } + + if (i++ != clauses.length - 1) { + buffer.append(" "); + } + } + + if (needParens) { + buffer.append(")"); + } + + if (bQuery.getMinimumNumberShouldMatch()>0) { + buffer.append('~'); + buffer.append(bQuery.getMinimumNumberShouldMatch()); + } + + if (bQuery.getBoost() != 1.0f) { + buffer.append(ToStringUtils.boost(bQuery.getBoost())); + } + + return buffer.toString(); + } + +} Index: src/java/org/apache/nutch/searcher/Summary.java =================================================================== --- src/java/org/apache/nutch/searcher/Summary.java (revision 681741) +++ src/java/org/apache/nutch/searcher/Summary.java (working copy) @@ -88,7 +88,7 @@ public boolean isEllipsis() { return true; } } - private ArrayList fragments = new ArrayList(); + private ArrayList fragments = new ArrayList(); private static final Fragment[] FRAGMENT_PROTO = new Fragment[0]; @@ -100,7 +100,7 @@ /** Returns an array of all of this summary's fragments.*/ public Fragment[] getFragments() { - return (Fragment[])fragments.toArray(FRAGMENT_PROTO); + return fragments.toArray(FRAGMENT_PROTO); } /** Returns a String representation of this Summary. */ @@ -126,7 +126,7 @@ Fragment fragment = null; StringBuffer buf = new StringBuffer(); for (int i=0; i") .append(encode ? Entities.encode(fragment.getText()) @@ -185,7 +185,7 @@ out.writeInt(fragments.size()); Fragment fragment = null; for (int i=0; i"); + Iterator>> iterator = doc.fieldIterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + List values = entry.getValue(); + for (String value : values) { + addField(entry.getKey(), value); + } + } + docBuffer.append(""); + + if (++numBufferedDocs >= MAX_BUFFERED_DOCS) { + sendDocuments(); + } + } + + /** Sends the given query to server. + * + * @param query Query in string form. + * @param numHits Maximum number of results to retrieve. + * @param fieldsToRetrieve Stored fields that will be retrieved for results. + * @param sortField Field to sort on. + * @param reverse If true, results are sorted in descending order. + * @return byte array containing the response from server. + * @throws IOException + */ + public byte[] search(String query, int numHits, String[] fieldsToRetrieve, + String sortField, boolean reverse) + throws IOException { + StringBuilder uri = new StringBuilder(solrSelectUrl); + uri.append("?q="); + uri.append(URLEncoder.encode(query, "UTF-8")); + uri.append("&rows="); + uri.append(numHits); + uri.append("&fl="); + if (fieldsToRetrieve != null) { + for (String field : fieldsToRetrieve) { + if (field != null) { + uri.append(field); + uri.append("%2C"); // encoded ',' + } + } + } + if (sortField != null) { + uri.append("&sort="); + uri.append(sortField); + if (reverse) { + uri.append("+desc"); //encoded " desc" + } else { + uri.append("+asc"); //encoded " asc" + } + } + + // we don't need request parameters again + uri.append("&echoParams=none"); + + GetMethod get = new GetMethod(uri.toString()); + try { + int code = httpClient.executeMethod(get); + // httpclient enforces reading response before sending another request + byte[] response = get.getResponseBody(); + if (code != 200) { + throw new IOException("Http response code is " + code + ", not 200."); + } + get.releaseConnection(); + return response; + } catch (IOException e) { + throw e; + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } finally { + get.releaseConnection(); + } + } + + /** + * Commit changes and optimize. Note: Solr will not serve newly added/removed + * documents until commit unless autoCommit=true. + * + * @param block If true, this method will block until optimize is complete. + */ + public void commitAndOptimize(boolean block) throws IOException { + if (numBufferedDocs > 0) { + sendDocuments(); + } + sendToSolrServer(""); + sendToSolrServer(""); + } + + public boolean ping() throws IOException { + GetMethod get = new GetMethod(solrPingUrl); + try { + int code = httpClient.executeMethod(get); + // read response body and ignore + get.getResponseBody(); + + return code == 200; + } finally { + get.releaseConnection(); + } + } + + private void initDocBuffer() { + docBuffer = new StringBuilder(65536); + docBuffer.append(""); + numBufferedDocs = 0; + } + + private static boolean isLegalXml(final char c) { + return c == 0x9 || c == 0xa || c == 0xd || (c >= 0x20 && c <= 0xd7ff) + || (c >= 0xe000 && c <= 0xfffd) || (c >= 0x10000 && c <= 0x10ffff); + } + + private void addField(String name, String value) + throws UnsupportedEncodingException { + docBuffer.append(""); + StringBuilder valBuffer = new StringBuilder(); + int valLen = value.length(); + // escape illegal xml characters + for (int i = 0; i < valLen; i++) { + char ch = value.charAt(i); + switch (ch) { + case '"': + valBuffer.append("""); + case '\'': + valBuffer.append("'"); + break; + case '&': + valBuffer.append("&"); + break; + case '<': + valBuffer.append("<"); + break; + case '>': + valBuffer.append(">"); + break; + default: + // skip illegal characters we can't handle + if (isLegalXml(ch)) { + valBuffer.append(ch); + } + } + } + docBuffer.append(valBuffer.toString()); + docBuffer.append(""); + } + + private void sendDocuments() throws IOException { + if (numBufferedDocs == 0) { + return; + } + docBuffer.append(""); + sendToSolrServer(docBuffer.toString()); + initDocBuffer(); + } + + private void sendToSolrServer(String data) throws IOException { + PostMethod post = new PostMethod(solrUpdateUrl); + post.setRequestEntity(new StringRequestEntity(data)); + try { + int code = httpClient.executeMethod(post); + // httpclient enforces reading response before sending another request + byte[] response = post.getResponseBody(); + if (code != 200) { + throw new IOException("Http response code is " + code + ", not 200. " + + "Response:" + new String(response)); + } + post.releaseConnection(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } finally { + post.releaseConnection(); + } + + } +} Index: src/java/org/apache/nutch/util/solr/SolrResponseHandler.java =================================================================== --- src/java/org/apache/nutch/util/solr/SolrResponseHandler.java (revision 0) +++ src/java/org/apache/nutch/util/solr/SolrResponseHandler.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util.solr; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nutch.searcher.SolrSearchBean.FieldType; +import org.xml.sax.Attributes; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.DefaultHandler; + +public class SolrResponseHandler extends DefaultHandler { + + private long numResults; + private List> topResults; + private Map typeMap; + + private boolean inDoc; + private String field; + private String fieldType; + private Map result; + private List arrValues; + private StringBuilder buf; + private boolean shouldBuildString; + + public SolrResponseHandler() { + buf = new StringBuilder(); + topResults = new ArrayList>(); + typeMap = new HashMap(); + + result = new HashMap(); + arrValues = new ArrayList(); + buf = new StringBuilder(); + } + + @Override + public void startDocument() throws SAXException { + numResults = 0L; + topResults.clear(); + typeMap.clear(); + + inDoc = false; + field = null; + fieldType = null; + result.clear(); + arrValues.clear(); + buf.setLength(0); + shouldBuildString = false; + + } + + @Override + public void startElement(String namespaceURI, String localName, + String qName, Attributes atts) + throws SAXException { + if (inDoc) { + // ignore type information coming from elements under "arr" + shouldBuildString = true; + if (!"arr".equals(fieldType)) { + if ("arr".equals(qName)) { + arrValues = new ArrayList(); + } + fieldType = qName.intern(); + field = atts.getValue("name"); + if (!typeMap.containsKey(field)) { + typeMap.put(field, FieldType.valueOf(qName.toUpperCase())); + } + } + } else if ("result".equals(qName)) { + numResults = Long.valueOf(atts.getValue("numFound")); + } else if ("doc".equals(qName)) { + inDoc = true; + result = new HashMap(); + } + } + + @Override + public void endElement(String namespaceURI, String localName, String qName) + throws SAXException { + if ("doc".equals(qName)) { + inDoc = false; + shouldBuildString = false; + topResults.add(result); + } + + if (inDoc) { + if ("arr".equals(qName)) { // end of "arr" + result.put(field, arrValues.toArray(new String[arrValues.size()])); + fieldType = ""; + } else if ("arr".equals(fieldType)) { + arrValues.add(buf.toString()); + buf.setLength(0); + } else { + result.put(field, buf.toString()); + buf.setLength(0); + } + } + } + + @Override + public void characters(char ch[], int start, int length) + throws SAXException { + if (shouldBuildString) { + buf.append(ch, start, length); + } + } + + public long getNumResults() { + return numResults; + } + + public int getNumTopResults() { + return topResults.size(); + } + + public Map getResult(int i) { + return topResults.get(i); + } + + public FieldType getType(String field) { + return typeMap.get(field); + } + +} Index: src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java =================================================================== --- src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java (revision 681741) +++ src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java (working copy) @@ -17,8 +17,6 @@ package org.creativecommons.nutch; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.nutch.metadata.CreativeCommons; import org.apache.nutch.parse.Parse; @@ -25,6 +23,8 @@ import org.apache.nutch.indexer.IndexingFilter; import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.hadoop.io.Text; import org.apache.nutch.crawl.CrawlDatum; @@ -30,7 +30,6 @@ import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.metadata.Metadata; -import org.apache.nutch.metadata.CreativeCommons; import org.apache.hadoop.conf.Configuration; @@ -50,7 +49,7 @@ private Configuration conf; - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { Metadata metadata = parse.getData().getParseMeta(); @@ -86,7 +85,7 @@ /** Add the features represented by a license URL. Urls are of the form * "http://creativecommons.org/licenses/xx-xx/xx/xx", where "xx" names a * license feature. */ - public void addUrlFeatures(Document doc, String urlString) { + public void addUrlFeatures(NutchDocument doc, String urlString) { try { URL url = new URL(urlString); @@ -108,8 +107,12 @@ } } - private void addFeature(Document doc, String feature) { - doc.add(new Field(FIELD, feature, Field.Store.YES, Field.Index.UN_TOKENIZED)); + private void addFeature(NutchDocument doc, String feature) { + doc.add(FIELD, feature); + } + + public void addIndexBackendOptions(Configuration conf) { + LuceneWriter.addFieldOptions(FIELD, LuceneWriter.STORE.YES, LuceneWriter.INDEX.UNTOKENIZED, conf); } public void setConf(Configuration conf) { Index: src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java =================================================================== --- src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java (revision 681741) +++ src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java (working copy) @@ -25,8 +25,6 @@ //APACHE imports import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; @@ -31,6 +29,8 @@ import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; import org.apache.nutch.indexer.IndexingFilter; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.nutch.metadata.Feed; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.parse.Parse; @@ -71,7 +71,7 @@ * index. * */ - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { ParseData parseData = parse.getData(); Metadata parseMeta = parseData.getParseMeta(); @@ -84,8 +84,7 @@ if (authors != null) { for (String author : authors) { - doc.add(new Field(Feed.FEED_AUTHOR, author, - Field.Store.YES, Field.Index.TOKENIZED)); + doc.add(Feed.FEED_AUTHOR, author); } } @@ -91,8 +90,7 @@ if (tags != null) { for (String tag : tags) { - doc.add(new Field(Feed.FEED_TAGS, tag, - Field.Store.YES, Field.Index.TOKENIZED)); + doc.add(Feed.FEED_TAGS, tag); } } @@ -97,7 +95,7 @@ } if (feed != null) - doc.add(new Field(Feed.FEED, feed, Field.Store.YES, Field.Index.TOKENIZED)); + doc.add(Feed.FEED, feed); SimpleDateFormat sdf = new SimpleDateFormat(dateFormatStr); sdf.setTimeZone(TimeZone.getTimeZone("GMT")); @@ -104,8 +102,7 @@ if (published != null) { Date date = new Date(Long.parseLong(published)); String dateString = sdf.format(date); - doc.add(new Field(PUBLISHED_DATE, dateString, - Field.Store.YES, Field.Index.NO_NORMS)); + doc.add(PUBLISHED_DATE, dateString); } if (updated != null) { @@ -111,8 +108,7 @@ if (updated != null) { Date date = new Date(Long.parseLong(updated)); String dateString = sdf.format(date); - doc.add(new Field(UPDATED_DATE, dateString, - Field.Store.YES, Field.Index.NO_NORMS)); + doc.add(UPDATED_DATE, dateString); } return doc; @@ -126,6 +122,24 @@ return conf; } + public void addIndexBackendOptions(Configuration conf) { + LuceneWriter.addFieldOptions(Feed.FEED_AUTHOR, + LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf); + + LuceneWriter.addFieldOptions(Feed.FEED_TAGS, + LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf); + + LuceneWriter.addFieldOptions(Feed.FEED, + LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf); + + LuceneWriter.addFieldOptions(PUBLISHED_DATE, + LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO_NORMS, conf); + + LuceneWriter.addFieldOptions(UPDATED_DATE, + LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO_NORMS, conf); + + } + /** * Sets the {@link Configuration} object used to configure this * {@link IndexingFilter}. Index: src/plugin/index-anchor/src/java/org/apache/nutch/indexer/anchor/AnchorIndexingFilter.java =================================================================== --- src/plugin/index-anchor/src/java/org/apache/nutch/indexer/anchor/AnchorIndexingFilter.java (revision 681741) +++ src/plugin/index-anchor/src/java/org/apache/nutch/indexer/anchor/AnchorIndexingFilter.java (working copy) @@ -16,8 +16,6 @@ */ package org.apache.nutch.indexer.anchor; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -22,8 +20,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; @@ -28,6 +24,8 @@ import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; import org.apache.nutch.indexer.IndexingFilter; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.nutch.parse.Parse; /** @@ -47,21 +45,13 @@ return this.conf; } - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { - try { - String[] anchors = (inlinks != null ? inlinks.getAnchors() - : new String[0]); - for (int i = 0; i < anchors.length; i++) { - doc.add(new Field("anchor", anchors[i], Field.Store.NO, - Field.Index.TOKENIZED)); - } - } catch (IOException ioe) { - if (LOG.isWarnEnabled()) { - LOG.warn("AnchorIndexingFilter: can't get anchors for " - + url.toString()); - } + String[] anchors = (inlinks != null ? inlinks.getAnchors() + : new String[0]); + for (int i = 0; i < anchors.length; i++) { + doc.add("anchor", anchors[i]); } return doc; @@ -66,4 +56,10 @@ return doc; } + + public void addIndexBackendOptions(Configuration conf) { + LuceneWriter.addFieldOptions("anchor", LuceneWriter.STORE.NO, + LuceneWriter.INDEX.TOKENIZED, conf); + } + } Index: src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java =================================================================== --- src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java (revision 681741) +++ src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java (working copy) @@ -21,8 +21,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.lucene.document.DateTools; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.parse.Parse; @@ -29,6 +27,8 @@ import org.apache.nutch.indexer.IndexingFilter; import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.hadoop.io.Text; import org.apache.nutch.crawl.CrawlDatum; @@ -45,7 +45,7 @@ private int MAX_TITLE_LENGTH; private Configuration conf; - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { Text reprUrl = (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); @@ -66,25 +66,12 @@ } if (host != null) { - // add host as un-stored, indexed and tokenized - doc.add(new Field("host", host, Field.Store.NO, Field.Index.TOKENIZED)); - // add site as un-stored, indexed and un-tokenized - doc.add(new Field("site", host, Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add("host", host); + doc.add("site", host); } - // url is both stored and indexed, so it's both searchable and returned - doc.add(new Field("url", - reprUrlString == null ? urlString : reprUrlString, - Field.Store.YES, Field.Index.TOKENIZED)); - - if (reprUrlString != null) { - // also store original url as both stored and indexes - doc.add(new Field("orig", urlString, - Field.Store.YES, Field.Index.TOKENIZED)); - } - - // content is indexed, so that it's searchable, but not stored in index - doc.add(new Field("content", parse.getText(), Field.Store.NO, Field.Index.TOKENIZED)); + doc.add("url", reprUrlString == null ? urlString : reprUrlString); + doc.add("content", parse.getText()); // title String title = parse.getData().getTitle(); @@ -91,18 +78,18 @@ if (title.length() > MAX_TITLE_LENGTH) { // truncate title if needed title = title.substring(0, MAX_TITLE_LENGTH); } - // add title indexed and stored so that it can be displayed - doc.add(new Field("title", title, Field.Store.YES, Field.Index.TOKENIZED)); + doc.add("title", title); + // add cached content/summary display policy, if available String caching = parse.getData().getMeta(Nutch.CACHING_FORBIDDEN_KEY); if (caching != null && !caching.equals(Nutch.CACHING_FORBIDDEN_NONE)) { - doc.add(new Field("cache", caching, Field.Store.YES, Field.Index.NO)); + doc.add("cache", caching); } // add timestamp when fetched, for deduplication - doc.add(new Field("tstamp", - DateTools.timeToString(datum.getFetchTime(), DateTools.Resolution.MILLISECOND), - Field.Store.YES, Field.Index.NO)); + doc.add("tstamp", + DateTools.timeToString(datum.getFetchTime(), + DateTools.Resolution.MILLISECOND)); return doc; } @@ -107,6 +94,40 @@ return doc; } + public void addIndexBackendOptions(Configuration conf) { + + /////////////////////////// + // add lucene options // + /////////////////////////// + + // host is un-stored, indexed and tokenized + LuceneWriter.addFieldOptions("host", LuceneWriter.STORE.NO, + LuceneWriter.INDEX.TOKENIZED, conf); + + // site is un-stored, indexed and un-tokenized + LuceneWriter.addFieldOptions("site", LuceneWriter.STORE.NO, + LuceneWriter.INDEX.UNTOKENIZED, conf); + + // url is both stored and indexed, so it's both searchable and returned + LuceneWriter.addFieldOptions("url", LuceneWriter.STORE.YES, + LuceneWriter.INDEX.TOKENIZED, conf); + + // content is indexed, so that it's searchable, but not stored in index + LuceneWriter.addFieldOptions("content", LuceneWriter.STORE.NO, + LuceneWriter.INDEX.TOKENIZED, conf); + + // anchors are indexed, so they're searchable, but not stored in index + LuceneWriter.addFieldOptions("anchor", LuceneWriter.STORE.NO, + LuceneWriter.INDEX.TOKENIZED, conf); + + // title is indexed and stored so that it can be displayed + LuceneWriter.addFieldOptions("title", LuceneWriter.STORE.YES, + LuceneWriter.INDEX.TOKENIZED, conf); + + LuceneWriter.addFieldOptions("cache", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, conf); + LuceneWriter.addFieldOptions("tstamp", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, conf); + } + public void setConf(Configuration conf) { this.conf = conf; this.MAX_TITLE_LENGTH = conf.getInt("indexer.max.title.length", 100); Index: src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java =================================================================== --- src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java (revision 681741) +++ src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java (working copy) @@ -28,9 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; - import org.apache.nutch.metadata.Metadata; import org.apache.nutch.net.protocols.HttpDateFormat; @@ -40,6 +37,8 @@ import org.apache.nutch.indexer.IndexingFilter; import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; @@ -80,7 +79,7 @@ /** Get the MimeTypes resolver instance. */ private MimeUtil MIME; - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { String url_s = url.toString(); @@ -95,7 +94,7 @@ // Add time related meta info. Add last-modified if present. Index date as // last-modified, or, if that's not present, use fetch time. - private Document addTime(Document doc, ParseData data, + private NutchDocument addTime(NutchDocument doc, ParseData data, String url, CrawlDatum datum) { long time = -1; @@ -103,7 +102,7 @@ if (lastModified != null) { // try parse last-modified time = getTime(lastModified,url); // use as time // store as string - doc.add(new Field("lastModified", new Long(time).toString(), Field.Store.YES, Field.Index.NO)); + doc.add("lastModified", Long.toString(time)); } if (time == -1) { // if no last-modified @@ -117,7 +116,7 @@ String dateString = sdf.format(new Date(time)); // un-stored, indexed and un-tokenized - doc.add(new Field("date", dateString, Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add("date", dateString); return doc; } @@ -167,11 +166,11 @@ } // Add Content-Length - private Document addLength(Document doc, ParseData data, String url) { + private NutchDocument addLength(NutchDocument doc, ParseData data, String url) { String contentLength = data.getMeta(Response.CONTENT_LENGTH); if (contentLength != null) - doc.add(new Field("contentLength", contentLength, Field.Store.YES, Field.Index.NO)); + doc.add("contentLength", contentLength); return doc; } @@ -177,7 +176,7 @@ } // Add Content-Type and its primaryType and subType - private Document addType(Document doc, ParseData data, String url) { + private NutchDocument addType(NutchDocument doc, ParseData data, String url) { MimeType mimeType = null; String contentType = data.getMeta(Response.CONTENT_TYPE); if (contentType == null) { @@ -218,14 +217,13 @@ // type:vnd.ms-powerpoint // all case insensitive. // The query filter is implemented in TypeQueryFilter.java - doc.add(new Field("type", contentType, Field.Store.NO, Field.Index.UN_TOKENIZED)); - doc.add(new Field("type", primaryType, Field.Store.NO, Field.Index.UN_TOKENIZED)); - doc.add(new Field("type", subType, Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add("type", contentType); + doc.add("type", primaryType); + doc.add("type", subType); // add its primaryType and subType to respective fields - // as stored, indexed and un-tokenized - doc.add(new Field("primaryType", primaryType, Field.Store.YES, Field.Index.UN_TOKENIZED)); - doc.add(new Field("subType", subType, Field.Store.YES, Field.Index.UN_TOKENIZED)); + doc.add("primaryType", primaryType); + doc.add("subType", subType); return doc; } @@ -254,7 +252,7 @@ } } - private Document resetTitle(Document doc, ParseData data, String url) { + private NutchDocument resetTitle(NutchDocument doc, ParseData data, String url) { String contentDisposition = data.getMeta(Metadata.CONTENT_DISPOSITION); if (contentDisposition == null) return doc; @@ -263,7 +261,7 @@ for (int i=0; i entries, String contents[]) { int c = 0; - Iterator i = entries.iterator(); - while (i.hasNext()) { - NGramProfile.NGramEntry nge = (NGramProfile.NGramEntry) i.next(); + for (NGramEntry nge : entries) { assertEquals(contents[c], nge.getSeq().toString()); c++; } @@ -192,12 +190,10 @@ } } - private void testCounts(List entries, int counts[]) { + private void testCounts(List entries, int counts[]) { int c = 0; - Iterator i = entries.iterator(); - while (i.hasNext()) { - NGramProfile.NGramEntry nge = (NGramProfile.NGramEntry) i.next(); + for (NGramEntry nge : entries) { System.out.println(nge); assertEquals(counts[c], nge.getCount()); c++; Index: src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java =================================================================== --- src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java (revision 681741) +++ src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java (working copy) @@ -22,6 +22,8 @@ import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingFilter; import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.hadoop.io.Text; import org.apache.nutch.parse.Parse; @@ -28,10 +30,6 @@ // Hadoop imports import org.apache.hadoop.conf.Configuration; -// Lucene imports -import org.apache.lucene.document.Field; -import org.apache.lucene.document.Document; - /** * An {@link org.apache.nutch.indexer.IndexingFilter} that @@ -48,7 +46,7 @@ // Inherited JavaDoc - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { // Check if some Rel-Tags found, possibly put there by RelTagParser @@ -55,8 +53,7 @@ String[] tags = parse.getData().getParseMeta().getValues(RelTagParser.REL_TAG); if (tags != null) { for (int i=0; i * Index: src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java =================================================================== --- src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java (revision 681741) +++ src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java (working copy) @@ -29,11 +29,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; -import org.apache.nutch.fetcher.Fetcher; -import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseData; @@ -158,7 +156,7 @@ } /** Dampen the boost value by scorePower.*/ - public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException { + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException { return (float)Math.pow(dbDatum.getScore(), scorePower) * initScore; } } Index: src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java =================================================================== --- src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java (revision 681741) +++ src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java (working copy) @@ -19,8 +19,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +28,8 @@ import org.apache.nutch.indexer.IndexingFilter; import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.nutch.collection.CollectionManager; import org.apache.nutch.crawl.CrawlDatum; @@ -62,12 +62,12 @@ * @param doc * @param url */ - private void addSubCollectionField(Document doc, String url) { + private void addSubCollectionField(NutchDocument doc, String url) { String collname = CollectionManager.getCollectionManager(getConf()).getSubCollections(url); - doc.add(new Field(FIELD_NAME, collname, Field.Store.YES, Field.Index.TOKENIZED)); + doc.add(FIELD_NAME, collname); } - public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException { String sUrl = url.toString(); addSubCollectionField(doc, sUrl); return doc; @@ -72,4 +72,9 @@ addSubCollectionField(doc, sUrl); return doc; } + + public void addIndexBackendOptions(Configuration conf) { + LuceneWriter.addFieldOptions(FIELD_NAME, LuceneWriter.STORE.YES, + LuceneWriter.INDEX.UNTOKENIZED, conf); + } } Index: src/plugin/tld/src/java/org/apache/nutch/indexer/tld/TLDIndexingFilter.java =================================================================== --- src/plugin/tld/src/java/org/apache/nutch/indexer/tld/TLDIndexingFilter.java (revision 681741) +++ src/plugin/tld/src/java/org/apache/nutch/indexer/tld/TLDIndexingFilter.java (working copy) @@ -23,8 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; @@ -29,6 +27,8 @@ import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.indexer.IndexingException; import org.apache.nutch.indexer.IndexingFilter; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.lucene.LuceneWriter; import org.apache.nutch.parse.Parse; import org.apache.nutch.util.URLUtil; import org.apache.nutch.util.domain.DomainSuffix; @@ -42,7 +42,7 @@ private Configuration conf; - public Document filter(Document doc, Parse parse, Text urlText, CrawlDatum datum, Inlinks inlinks) + public NutchDocument filter(NutchDocument doc, Parse parse, Text urlText, CrawlDatum datum, Inlinks inlinks) throws IndexingException { try { @@ -49,8 +49,7 @@ URL url = new URL(urlText.toString()); DomainSuffix d = URLUtil.getDomainSuffix(url); - // store, no index - doc.add(new Field("tld", d.getDomain(), Field.Store.YES, Field.Index.NO)); + doc.add("tld", d.getDomain()); }catch (Exception ex) { LOG.warn(ex); @@ -66,4 +65,10 @@ public Configuration getConf() { return this.conf; } + + public void addIndexBackendOptions(Configuration conf) { + // store, no index + LuceneWriter.addFieldOptions("tld", LuceneWriter.STORE.YES, + LuceneWriter.INDEX.NO, conf); + } } Index: src/plugin/tld/src/java/org/apache/nutch/scoring/tld/TLDScoringFilter.java =================================================================== --- src/plugin/tld/src/java/org/apache/nutch/scoring/tld/TLDScoringFilter.java (revision 681741) +++ src/plugin/tld/src/java/org/apache/nutch/scoring/tld/TLDScoringFilter.java (working copy) @@ -17,7 +17,6 @@ package org.apache.nutch.scoring.tld; -import java.util.ArrayList; import java.util.List; import java.util.Collection; import java.util.Map.Entry; @@ -24,9 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.lucene.document.Document; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseData; import org.apache.nutch.protocol.Content; @@ -49,11 +48,11 @@ tldEntries = DomainSuffixes.getInstance(); } - public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException { - String[] tlds = doc.getValues("tld"); + List tlds = doc.getFieldValues("tld"); float boost = 1.0f; if(tlds != null) { @@ -93,8 +92,9 @@ throws ScoringFilterException { } - public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, - List inlinked) throws ScoringFilterException { + public void updateDbScore(Text url, CrawlDatum old, + CrawlDatum datum, List inlinked) + throws ScoringFilterException { } public Configuration getConf() { Index: src/test/org/apache/nutch/indexer/TestIndexingFilters.java =================================================================== --- src/test/org/apache/nutch/indexer/TestIndexingFilters.java (revision 681741) +++ src/test/org/apache/nutch/indexer/TestIndexingFilters.java (working copy) @@ -43,7 +43,7 @@ conf.set(IndexingFilters.INDEXINGFILTER_ORDER, class1 + " " + class2); IndexingFilters filters = new IndexingFilters(conf); - filters.filter(new Document(), new ParseImpl("text", new ParseData( + filters.filter(new NutchDocument(), new ParseImpl("text", new ParseData( new ParseStatus(), "title", new Outlink[0], new Metadata())), new Text( "http://www.example.com/"), new CrawlDatum(), new Inlinks()); } Index: src/web/jsp/anchors.jsp =================================================================== --- src/web/jsp/anchors.jsp (revision 681741) +++ src/web/jsp/anchors.jsp (working copy) @@ -33,7 +33,7 @@ request.setCharacterEncoding("UTF-8"); bean.LOG.info("anchors request from " + request.getRemoteAddr()); Hit hit = new Hit(Integer.parseInt(request.getParameter("idx")), - Integer.parseInt(request.getParameter("id"))); + request.getParameter("id")); HitDetails details = bean.getDetails(hit); String language = ResourceBundle.getBundle("org.nutch.jsp.anchors", request.getLocale()) Index: src/web/jsp/cached.jsp =================================================================== --- src/web/jsp/cached.jsp (revision 681741) +++ src/web/jsp/cached.jsp (working copy) @@ -31,9 +31,9 @@ NutchBean bean = NutchBean.get(application, nutchConf); bean.LOG.info("cache request from " + request.getRemoteAddr()); Hit hit = new Hit(Integer.parseInt(request.getParameter("idx")), - Integer.parseInt(request.getParameter("id"))); + request.getParameter("id")); HitDetails details = bean.getDetails(hit); - String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo(); + String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey(); String language = ResourceBundle.getBundle("org.nutch.jsp.cached", request.getLocale()) Index: src/web/jsp/explain.jsp =================================================================== --- src/web/jsp/explain.jsp (revision 681741) +++ src/web/jsp/explain.jsp (working copy) @@ -31,10 +31,9 @@ request.setCharacterEncoding("UTF-8"); bean.LOG.info("explain request from " + request.getRemoteAddr()); Hit hit = new Hit(Integer.parseInt(request.getParameter("idx")), - Integer.parseInt(request.getParameter("id"))); + request.getParameter("id")); HitDetails details = bean.getDetails(hit); // get the lang from request - // get the lang from request String queryLang = request.getParameter("lang"); if (queryLang == null) { queryLang = ""; } Query query = Query.parse(request.getParameter("query"), queryLang, nutchConf); Index: src/web/jsp/search.jsp =================================================================== --- src/web/jsp/search.jsp (revision 681741) +++ src/web/jsp/search.jsp (working copy) @@ -247,7 +247,7 @@ HitDetails detail = details[i]; String title = detail.getValue("title"); String url = detail.getValue("url"); - String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo(); + String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey(); String summary = summaries[i].toHtml(true); String caching = detail.getValue("cache"); boolean showSummary = true;